summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence/persistencequeuetest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/persistence/persistencequeuetest.cpp')
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp196
1 files changed, 150 insertions, 46 deletions
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index f31623eed61..e96ad013923 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -15,86 +15,190 @@ using document::test::makeDocumentBucket;
namespace storage {
-class PersistenceQueueTest : public FileStorTestFixture
-{
+class PersistenceQueueTest : public FileStorTestFixture {
public:
void testFetchNextUnlockedMessageIfBucketLocked();
+ void shared_locked_operations_allow_concurrent_bucket_access();
+ void exclusive_locked_operation_not_started_if_shared_op_active();
+ void shared_locked_operation_not_started_if_exclusive_op_active();
+ void exclusive_locked_operation_not_started_if_exclusive_op_active();
+ void operation_batching_not_allowed_across_different_lock_modes();
- std::shared_ptr<api::StorageMessage>
- createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
void setUp() override;
- void tearDown() override;
CPPUNIT_TEST_SUITE(PersistenceQueueTest);
CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked);
+ CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active);
+ CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active);
+ CPPUNIT_TEST(operation_batching_not_allowed_across_different_lock_modes);
CPPUNIT_TEST_SUITE_END();
+
+ struct Fixture {
+ FileStorTestFixture& parent;
+ DummyStorageLink top;
+ std::unique_ptr<DummyStorageLink> dummyManager;
+ ForwardingMessageSender messageSender;
+ documentapi::LoadTypeSet loadTypes;
+ FileStorMetrics metrics;
+ std::unique_ptr<FileStorHandler> filestorHandler;
+ uint32_t stripeId;
+
+ explicit Fixture(FileStorTestFixture& parent);
+ ~Fixture();
+ };
+
+ static constexpr uint16_t _disk = 0;
};
CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest);
-void
-PersistenceQueueTest::setUp()
+PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
+ : parent(parent_),
+ top(),
+ dummyManager(std::make_unique<DummyStorageLink>()),
+ messageSender(*dummyManager),
+ loadTypes("raw:"),
+ metrics(loadTypes.getMetricLoadTypes())
{
- setupDisks(1);
- _node->setPersistenceProvider(
- spi::PersistenceProvider::UP(
- new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)));
+ top.push_back(std::move(dummyManager));
+ top.open();
+
+ metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
+
+ filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(),
+ parent._node->getComponentRegister());
+ // getNextMessage will time out if no unlocked buckets are present. Choose a timeout
+ // that is large enough to fail tests with high probability if this is not the case,
+ // and small enough to not slow down testing too much.
+ filestorHandler->setGetNextMessageTimeout(20);
+
+ stripeId = filestorHandler->getNextStripeId(0);
}
-void
-PersistenceQueueTest::tearDown()
-{
- _node.reset(0);
+PersistenceQueueTest::Fixture::~Fixture() = default;
+
+void PersistenceQueueTest::setUp() {
+ setupPersistenceThreads(1);
+ _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1));
}
-std::shared_ptr<api::StorageMessage>
-PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx)
-{
- std::ostringstream id;
- id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx;
- document::Document::SP doc(
- _node->getTestDocMan().createDocument("foobar", id.str()));
- std::shared_ptr<api::PutCommand> cmd(
- new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234));
- cmd->setAddress(api::StorageMessageAddress(
- "storage", lib::NodeType::STORAGE, 0));
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) {
+ std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument(
+ "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234);
+ cmd->setAddress(makeSelfAddress());
return cmd;
}
-void
-PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked()
-{
- DummyStorageLink top;
- DummyStorageLink *dummyManager;
- top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink));
- top.open();
- ForwardingMessageSender messageSender(*dummyManager);
-
- documentapi::LoadTypeSet loadTypes("raw:");
- FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
- metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1);
-
- FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister());
- uint32_t stripeId = filestorHandler.getNextStripeId(0);
+std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const {
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(document::BucketId(16, bucket)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]");
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() {
+ Fixture f(*this);
// Send 2 puts, 2 to the first bucket, 1 to the second. Calling
// getNextMessage 2 times should then return a lock on the first bucket,
// then subsequently on the second, skipping the already locked bucket.
// Puts all have same pri, so order is well defined.
- filestorHandler.schedule(createPut(1234, 0), 0);
- filestorHandler.schedule(createPut(1234, 1), 0);
- filestorHandler.schedule(createPut(5432, 0), 0);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 1), _disk);
+ f.filestorHandler->schedule(createPut(5432, 0), _disk);
- auto lock0 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock0.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234),
dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
- auto lock1 = filestorHandler.getNextMessage(0, stripeId);
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
CPPUNIT_ASSERT(lock1.first.get());
CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432),
dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
}
+void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Even though we already have a lock on the bucket, Gets allow shared locking and we
+ // should therefore be able to get another lock.
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock1.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createGet(1234), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first.get());
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ // Expected to time out
+ auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(!lock1.first.get());
+}
+
+void PersistenceQueueTest::operation_batching_not_allowed_across_different_lock_modes() {
+ Fixture f(*this);
+
+ f.filestorHandler->schedule(createPut(1234, 0), _disk);
+ f.filestorHandler->schedule(createGet(1234), _disk);
+
+ auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId);
+ CPPUNIT_ASSERT(lock0.first);
+ CPPUNIT_ASSERT(lock0.second);
+ CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+
+ f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0);
+ CPPUNIT_ASSERT(!lock0.second);
+}
+
} // namespace storage