diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2018-07-16 09:13:26 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@oath.com> | 2018-07-16 09:13:26 +0000 |
commit | 3834bd9c83fc2b2a81d0509158f0936e698689bc (patch) | |
tree | cbe5015a4316cc6d2c431c0546c33ebcb36a43aa /storage | |
parent | 6afadc2824b59fec9498672f475b8184b59a5000 (diff) |
Add tests of shared/exclusive persistence queue locking
Diffstat (limited to 'storage')
5 files changed, 156 insertions, 74 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index c92687f798b..835b8ef1044 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default") const uint32_t FileStorTestFixture::MSG_WAIT_TIME; void -FileStorTestFixture::setupDisks(uint32_t diskCount) +FileStorTestFixture::setupPersistenceThreads(uint32_t threads) { std::string rootOfRoot = "todo-make-unique-filestorefixture"; - _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot))); - - _config2.reset(new vdstestlib::DirConfig(*_config)); - _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-server").set("node_index", "1"); - - _smallConfig.reset(new vdstestlib::DirConfig(*_config)); - _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1), - _config->getConfigId())); + _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot)); + _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-server").set("node_index", "1"); + _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads)); + + _node = std::make_unique<TestServiceLayerApp>( + DiskCount(1), NodeIndex(1), _config->getConfigId()); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount) void FileStorTestFixture::setUp() { - setupDisks(1); + setupPersistenceThreads(1); _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } void FileStorTestFixture::tearDown() { - _node.reset(0); + _node.reset(); } void @@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( } api::StorageMessageAddress -FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const { +FileStorTestFixture::makeSelfAddress() { return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0); } diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h index c8158d01224..c46f9de24fc 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.h +++ b/storage/src/tests/persistence/common/filestortestfixture.h @@ -19,8 +19,6 @@ public: std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<vdstestlib::DirConfig> _config; - std::unique_ptr<vdstestlib::DirConfig> _config2; - std::unique_ptr<vdstestlib::DirConfig> _smallConfig; const document::DocumentType* _testdoctype1; static const uint32_t MSG_WAIT_TIME = 60 * 1000; @@ -30,10 +28,12 @@ public: void setUp() override; void tearDown() override; - void setupDisks(uint32_t diskCount); + void setupPersistenceThreads(uint32_t diskCount); void createBucket(const document::BucketId& bid); bool bucketExistsInDb(const document::BucketId& bucket) const; + static api::StorageMessageAddress makeSelfAddress(); + api::ReturnCode::Result resultOf(const api::StorageReply& reply) const { return reply.getResult().getResult(); } @@ -99,8 +99,6 @@ public: const char* testName, const StorageLinkInjector& i = NoOpStorageLinkInjector()); - api::StorageMessageAddress makeSelfAddress() const; - void sendDummyGet(const document::BucketId& bid); void sendPut(const document::BucketId& bid, uint32_t docIdx, diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp index 50999f5883e..d4cec415937 100644 --- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp +++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp @@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture { public: void setupDisks() { - FileStorTestFixture::setupDisks(1); + FileStorTestFixture::setupPersistenceThreads(1); _node->setPersistenceProvider( spi::PersistenceProvider::UP( new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 64ef48b5719..e12f48bcdea 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -86,9 +86,9 @@ public: std::unique_ptr<vespalib::Barrier> _queueBarrier; std::unique_ptr<vespalib::Barrier> _completionBarrier; - void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) { - FileStorTestFixture::setupDisks(diskCount); - _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount)); + void setupProviderAndBarriers(uint32_t queueBarrierThreads) { + FileStorTestFixture::setupPersistenceThreads(1); + _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)); _queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads)); _completionBarrier.reset(new vespalib::Barrier(2)); _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier); @@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets) void OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations() { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations"); document::BucketId bucket(16, 1); createBucket(bucket); @@ -305,7 +305,7 @@ public: void OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() { - setupDisks(1, 3); + setupProviderAndBarriers(3); TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket"); document::BucketId bucket(16, 1); @@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName, const std::vector<api::StorageMessage::SP>& msgs, bool shouldCreateBucketInitially) { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, testName); document::BucketId bucket(16, 1); document::BucketId blockerBucket(16, 2); diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index f31623eed61..746ae770cb3 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -15,86 +15,173 @@ using document::test::makeDocumentBucket; namespace storage { -class PersistenceQueueTest : public FileStorTestFixture -{ +class PersistenceQueueTest : public FileStorTestFixture { public: void testFetchNextUnlockedMessageIfBucketLocked(); + void shared_locked_operations_allow_concurrent_bucket_access(); + void exclusive_locked_operation_not_started_if_shared_op_active(); + void shared_locked_operation_not_started_if_exclusive_op_active(); + void exclusive_locked_operation_not_started_if_exclusive_op_active(); - std::shared_ptr<api::StorageMessage> - createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const; void setUp() override; - void tearDown() override; CPPUNIT_TEST_SUITE(PersistenceQueueTest); CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked); + CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active); + CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active); CPPUNIT_TEST_SUITE_END(); + + struct Fixture { + FileStorTestFixture& parent; + DummyStorageLink top; + std::unique_ptr<DummyStorageLink> dummyManager; + ForwardingMessageSender messageSender; + documentapi::LoadTypeSet loadTypes; + FileStorMetrics metrics; + std::unique_ptr<FileStorHandler> filestorHandler; + uint32_t stripeId; + + explicit Fixture(FileStorTestFixture& parent); + ~Fixture(); + }; + + static constexpr uint16_t _disk = 0; }; CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest); -void -PersistenceQueueTest::setUp() +PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) + : parent(parent_), + top(), + dummyManager(std::make_unique<DummyStorageLink>()), + messageSender(*dummyManager), + loadTypes("raw:"), + metrics(loadTypes.getMetricLoadTypes()) { - setupDisks(1); - _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + top.push_back(std::move(dummyManager)); + top.open(); + + metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); + + filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(), + parent._node->getComponentRegister()); + // getNextMessage will time out if no unlocked buckets are present. Choose a timeout + // that is large enough to fail tests with high probability if this is not the case, + // and small enough to not slow down testing too much. + filestorHandler->setGetNextMessageTimeout(20); + + stripeId = filestorHandler->getNextStripeId(0); } -void -PersistenceQueueTest::tearDown() -{ - _node.reset(0); +PersistenceQueueTest::Fixture::~Fixture() = default; + +void PersistenceQueueTest::setUp() { + setupPersistenceThreads(1); + _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } -std::shared_ptr<api::StorageMessage> -PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) -{ - std::ostringstream id; - id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx; - document::Document::SP doc( - _node->getTestDocMan().createDocument("foobar", id.str())); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234)); - cmd->setAddress(api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 0)); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) { + std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument( + "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234); + cmd->setAddress(makeSelfAddress()); return cmd; } -void -PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() -{ - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); - uint32_t stripeId = filestorHandler.getNextStripeId(0); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const { + auto cmd = std::make_shared<api::GetCommand>( + makeDocumentBucket(document::BucketId(16, bucket)), + document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]"); + cmd->setAddress(makeSelfAddress()); + return cmd; +} +void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() { + Fixture f(*this); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, // then subsequently on the second, skipping the already locked bucket. // Puts all have same pri, so order is well defined. - filestorHandler.schedule(createPut(1234, 0), 0); - filestorHandler.schedule(createPut(1234, 1), 0); - filestorHandler.schedule(createPut(5432, 0), 0); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 1), _disk); + f.filestorHandler->schedule(createPut(5432, 0), _disk); - auto lock0 = filestorHandler.getNextMessage(0, stripeId); + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock0.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, stripeId); + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock1.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } +void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Even though we already have a lock on the bucket, Gets allow shared locking and we + // should therefore be able to get another lock. + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock1.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + } // namespace storage |