diff options
Diffstat (limited to 'storage/src')
14 files changed, 343 insertions, 160 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index c92687f798b..835b8ef1044 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -18,19 +18,17 @@ spi::LoadType FileStorTestFixture::defaultLoadType = spi::LoadType(0, "default") const uint32_t FileStorTestFixture::MSG_WAIT_TIME; void -FileStorTestFixture::setupDisks(uint32_t diskCount) +FileStorTestFixture::setupPersistenceThreads(uint32_t threads) { std::string rootOfRoot = "todo-make-unique-filestorefixture"; - _config.reset(new vdstestlib::DirConfig(getStandardConfig(true, rootOfRoot))); - - _config2.reset(new vdstestlib::DirConfig(*_config)); - _config2->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); - _config2->getConfig("stor-server").set("node_index", "1"); - - _smallConfig.reset(new vdstestlib::DirConfig(*_config)); - _node.reset(new TestServiceLayerApp(DiskCount(diskCount), NodeIndex(1), - _config->getConfigId())); + _config = std::make_unique<vdstestlib::DirConfig>(getStandardConfig(true, rootOfRoot)); + _config->getConfig("stor-server").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-devices").set("root_folder", (rootOfRoot + "-vdsroot.2")); + _config->getConfig("stor-server").set("node_index", "1"); + _config->getConfig("stor-filestor").set("num_threads", std::to_string(threads)); + + _node = std::make_unique<TestServiceLayerApp>( + DiskCount(1), NodeIndex(1), _config->getConfigId()); _testdoctype1 = _node->getTypeRepo()->getDocumentType("testdoctype1"); } @@ -38,16 +36,15 @@ FileStorTestFixture::setupDisks(uint32_t diskCount) void FileStorTestFixture::setUp() { - setupDisks(1); + setupPersistenceThreads(1); _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } void FileStorTestFixture::tearDown() { - _node.reset(0); + _node.reset(); } void @@ -91,7 +88,7 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( } api::StorageMessageAddress -FileStorTestFixture::TestFileStorComponents::makeSelfAddress() const { +FileStorTestFixture::makeSelfAddress() { return api::StorageMessageAddress("storage", lib::NodeType::STORAGE, 0); } diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h index c8158d01224..c46f9de24fc 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.h +++ b/storage/src/tests/persistence/common/filestortestfixture.h @@ -19,8 +19,6 @@ public: std::unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<vdstestlib::DirConfig> _config; - std::unique_ptr<vdstestlib::DirConfig> _config2; - std::unique_ptr<vdstestlib::DirConfig> _smallConfig; const document::DocumentType* _testdoctype1; static const uint32_t MSG_WAIT_TIME = 60 * 1000; @@ -30,10 +28,12 @@ public: void setUp() override; void tearDown() override; - void setupDisks(uint32_t diskCount); + void setupPersistenceThreads(uint32_t diskCount); void createBucket(const document::BucketId& bid); bool bucketExistsInDb(const document::BucketId& bucket) const; + static api::StorageMessageAddress makeSelfAddress(); + api::ReturnCode::Result resultOf(const api::StorageReply& reply) const { return reply.getResult().getResult(); } @@ -99,8 +99,6 @@ public: const char* testName, const StorageLinkInjector& i = NoOpStorageLinkInjector()); - api::StorageMessageAddress makeSelfAddress() const; - void sendDummyGet(const document::BucketId& bid); void sendPut(const document::BucketId& bid, uint32_t docIdx, diff --git a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp index 50999f5883e..d4cec415937 100644 --- a/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp +++ b/storage/src/tests/persistence/filestorage/mergeblockingtest.cpp @@ -16,7 +16,7 @@ class MergeBlockingTest : public FileStorTestFixture { public: void setupDisks() { - FileStorTestFixture::setupDisks(1); + FileStorTestFixture::setupPersistenceThreads(1); _node->setPersistenceProvider( spi::PersistenceProvider::UP( new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 64ef48b5719..e12f48bcdea 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -86,9 +86,9 @@ public: std::unique_ptr<vespalib::Barrier> _queueBarrier; std::unique_ptr<vespalib::Barrier> _completionBarrier; - void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) { - FileStorTestFixture::setupDisks(diskCount); - _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount)); + void setupProviderAndBarriers(uint32_t queueBarrierThreads) { + FileStorTestFixture::setupPersistenceThreads(1); + _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1)); _queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads)); _completionBarrier.reset(new vespalib::Barrier(2)); _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier); @@ -219,7 +219,7 @@ makeAbortCmd(const Container& buckets) void OperationAbortingTest::testAbortMessageClearsRelevantQueuedOperations() { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, "testAbortMessageClearsRelevantQueuedOperations"); document::BucketId bucket(16, 1); createBucket(bucket); @@ -305,7 +305,7 @@ public: void OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() { - setupDisks(1, 3); + setupProviderAndBarriers(3); TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket"); document::BucketId bucket(16, 1); @@ -386,7 +386,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName, const std::vector<api::StorageMessage::SP>& msgs, bool shouldCreateBucketInitially) { - setupDisks(1, 2); + setupProviderAndBarriers(2); TestFileStorComponents c(*this, testName); document::BucketId bucket(16, 1); document::BucketId blockerBucket(16, 2); diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index f31623eed61..e96ad013923 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -15,86 +15,190 @@ using document::test::makeDocumentBucket; namespace storage { -class PersistenceQueueTest : public FileStorTestFixture -{ +class PersistenceQueueTest : public FileStorTestFixture { public: void testFetchNextUnlockedMessageIfBucketLocked(); + void shared_locked_operations_allow_concurrent_bucket_access(); + void exclusive_locked_operation_not_started_if_shared_op_active(); + void shared_locked_operation_not_started_if_exclusive_op_active(); + void exclusive_locked_operation_not_started_if_exclusive_op_active(); + void operation_batching_not_allowed_across_different_lock_modes(); - std::shared_ptr<api::StorageMessage> - createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx); + std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const; void setUp() override; - void tearDown() override; CPPUNIT_TEST_SUITE(PersistenceQueueTest); CPPUNIT_TEST(testFetchNextUnlockedMessageIfBucketLocked); + CPPUNIT_TEST(shared_locked_operations_allow_concurrent_bucket_access); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_shared_op_active); + CPPUNIT_TEST(shared_locked_operation_not_started_if_exclusive_op_active); + CPPUNIT_TEST(exclusive_locked_operation_not_started_if_exclusive_op_active); + CPPUNIT_TEST(operation_batching_not_allowed_across_different_lock_modes); CPPUNIT_TEST_SUITE_END(); + + struct Fixture { + FileStorTestFixture& parent; + DummyStorageLink top; + std::unique_ptr<DummyStorageLink> dummyManager; + ForwardingMessageSender messageSender; + documentapi::LoadTypeSet loadTypes; + FileStorMetrics metrics; + std::unique_ptr<FileStorHandler> filestorHandler; + uint32_t stripeId; + + explicit Fixture(FileStorTestFixture& parent); + ~Fixture(); + }; + + static constexpr uint16_t _disk = 0; }; CPPUNIT_TEST_SUITE_REGISTRATION(PersistenceQueueTest); -void -PersistenceQueueTest::setUp() +PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) + : parent(parent_), + top(), + dummyManager(std::make_unique<DummyStorageLink>()), + messageSender(*dummyManager), + loadTypes("raw:"), + metrics(loadTypes.getMetricLoadTypes()) { - setupDisks(1); - _node->setPersistenceProvider( - spi::PersistenceProvider::UP( - new spi::dummy::DummyPersistence(_node->getTypeRepo(), 1))); + top.push_back(std::move(dummyManager)); + top.open(); + + metrics.initDiskMetrics(parent._node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); + + filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, parent._node->getPartitions(), + parent._node->getComponentRegister()); + // getNextMessage will time out if no unlocked buckets are present. Choose a timeout + // that is large enough to fail tests with high probability if this is not the case, + // and small enough to not slow down testing too much. + filestorHandler->setGetNextMessageTimeout(20); + + stripeId = filestorHandler->getNextStripeId(0); } -void -PersistenceQueueTest::tearDown() -{ - _node.reset(0); +PersistenceQueueTest::Fixture::~Fixture() = default; + +void PersistenceQueueTest::setUp() { + setupPersistenceThreads(1); + _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo(), 1)); } -std::shared_ptr<api::StorageMessage> -PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) -{ - std::ostringstream id; - id << "id:foo:testdoctype1:n=" << bucket << ":" << docIdx; - document::Document::SP doc( - _node->getTestDocMan().createDocument("foobar", id.str())); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234)); - cmd->setAddress(api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 0)); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createPut(uint64_t bucket, uint64_t docIdx) { + std::shared_ptr<document::Document> doc = _node->getTestDocMan().createDocument( + "foobar", vespalib::make_string("id:foo:testdoctype1:n=%zu:%zu", bucket, docIdx)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), doc, 1234); + cmd->setAddress(makeSelfAddress()); return cmd; } -void -PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() -{ - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); - uint32_t stripeId = filestorHandler.getNextStripeId(0); +std::shared_ptr<api::StorageMessage> PersistenceQueueTest::createGet(uint64_t bucket) const { + auto cmd = std::make_shared<api::GetCommand>( + makeDocumentBucket(document::BucketId(16, bucket)), + document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%zu:0", bucket)), "[all]"); + cmd->setAddress(makeSelfAddress()); + return cmd; +} +void PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() { + Fixture f(*this); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, // then subsequently on the second, skipping the already locked bucket. // Puts all have same pri, so order is well defined. - filestorHandler.schedule(createPut(1234, 0), 0); - filestorHandler.schedule(createPut(1234, 1), 0); - filestorHandler.schedule(createPut(5432, 0), 0); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 1), _disk); + f.filestorHandler->schedule(createPut(5432, 0), _disk); - auto lock0 = filestorHandler.getNextMessage(0, stripeId); + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock0.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, stripeId); + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); CPPUNIT_ASSERT(lock1.first.get()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } +void PersistenceQueueTest::shared_locked_operations_allow_concurrent_bucket_access() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Even though we already have a lock on the bucket, Gets allow shared locking and we + // should therefore be able to get another lock. + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock1.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_shared_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createGet(1234), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::shared_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::exclusive_locked_operation_not_started_if_exclusive_op_active() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createPut(1234, 0), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first.get()); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + // Expected to time out + auto lock1 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(!lock1.first.get()); +} + +void PersistenceQueueTest::operation_batching_not_allowed_across_different_lock_modes() { + Fixture f(*this); + + f.filestorHandler->schedule(createPut(1234, 0), _disk); + f.filestorHandler->schedule(createGet(1234), _disk); + + auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); + CPPUNIT_ASSERT(lock0.first); + CPPUNIT_ASSERT(lock0.second); + CPPUNIT_ASSERT_EQUAL(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + + f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0); + CPPUNIT_ASSERT(!lock0.second); +} + } // namespace storage diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 27281d9b95f..4fc577226ca 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -62,7 +62,7 @@ private: CPPUNIT_TEST(testNormalUsage); CPPUNIT_TEST(testFailedCreateIterator); CPPUNIT_TEST(testFailedGetIter); - CPPUNIT_TEST(testMultipleFailedGetIter); + CPPUNIT_TEST(iterators_per_bucket_config_is_ignored_and_hardcoded_to_1); CPPUNIT_TEST(testDocumentAPIClientError); CPPUNIT_TEST(testNoDocumentAPIResendingForFailedVisitor); CPPUNIT_TEST(testIteratorCreatedForFailedVisitor); @@ -90,7 +90,7 @@ public: void testNormalUsage(); void testFailedCreateIterator(); void testFailedGetIter(); - void testMultipleFailedGetIter(); + void iterators_per_bucket_config_is_ignored_and_hardcoded_to_1(); void testDocumentAPIClientError(); void testNoDocumentAPIResendingForFailedVisitor(); void testIteratorCreatedForFailedVisitor(); @@ -592,36 +592,31 @@ VisitorTest::testFailedGetIter() CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); } -void -VisitorTest::testMultipleFailedGetIter() -{ - initializeTest(TestParams().iteratorsPerBucket(2)); - std::shared_ptr<api::CreateVisitorCommand> cmd( - makeCreateVisitor()); +void VisitorTest::iterators_per_bucket_config_is_ignored_and_hardcoded_to_1() { + initializeTest(TestParams().iteratorsPerBucket(20)); + auto cmd = makeCreateVisitor(); _top->sendDown(cmd); sendCreateIteratorReply(); - std::vector<GetIterCommand::SP> getIterCmds( - fetchMultipleCommands<GetIterCommand>(*_bottom, 2)); - - sendGetIterReply(*getIterCmds[0], - api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND)); - - // Wait for an "appropriate" amount of time so that wrongful logic - // will send a DestroyIteratorCommand before all pending GetIters - // have been replied to. - std::this_thread::sleep_for(100ms); + auto getIterCmd = fetchSingleCommand<GetIterCommand>(*_bottom); + CPPUNIT_ASSERT_EQUAL(spi::IteratorId(1234), + getIterCmd->getIteratorId()); + sendGetIterReply(*getIterCmd); CPPUNIT_ASSERT_EQUAL(size_t(0), _bottom->getNumCommands()); - sendGetIterReply(*getIterCmds[1], - api::ReturnCode(api::ReturnCode::BUCKET_DELETED)); + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> docIds; + std::vector<std::string> infoMessages; + getMessagesAndReply(_documents.size(), getSession(0), docs, docIds, infoMessages); + CPPUNIT_ASSERT_EQUAL(size_t(0), infoMessages.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), docIds.size()); - DestroyIteratorCommand::SP destroyIterCmd( - fetchSingleCommand<DestroyIteratorCommand>(*_bottom)); + auto destroyIterCmd = fetchSingleCommand<DestroyIteratorCommand>(*_bottom); - verifyCreateVisitorReply(api::ReturnCode::BUCKET_DELETED, 0, 0); + verifyCreateVisitorReply(api::ReturnCode::OK); CPPUNIT_ASSERT(waitUntilNoActiveVisitors()); + CPPUNIT_ASSERT_EQUAL(0L, getFailedVisitorDestinationReplyCount()); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 74baecbf026..0da0fd5ce66 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -71,9 +71,9 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& } FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk) +FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) { - return _impl->lock(bucket, disk); + return _impl->lock(bucket, disk, lockReq); } void diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index b74765b17d2..02c959df2f0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -58,8 +58,9 @@ public: typedef std::shared_ptr<BucketLockInterface> SP; virtual const document::Bucket &getBucket() const = 0; + virtual api::LockingRequirements lockingRequirements() const noexcept = 0; - virtual ~BucketLockInterface() {}; + virtual ~BucketLockInterface() = default; }; typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage; @@ -139,7 +140,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk); + BucketLockInterface::SP lock(const document::Bucket&, uint16_t disk, api::LockingRequirements lockReq); /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index a01881b6fbe..f9571228ef9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -370,16 +370,16 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint32_t stripeId) } std::shared_ptr<FileStorHandler::BucketLockInterface> -FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) -{ +FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) { vespalib::MonitorGuard guard(_lock); - while (isLocked(guard, bucket)) { - LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); + while (isLocked(guard, bucket, lockReq)) { + LOG(spam, "Contending for filestor lock for %s with %s access", + bucket.getBucketId().toString().c_str(), api::to_string(lockReq)); guard.wait(100); } - auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0); + auto locker = std::make_shared<BucketLock>(guard, *this, bucket, 255, api::MessageType::INTERNAL_ID, 0, lockReq); guard.broadcast(); return locker; @@ -388,9 +388,9 @@ FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket) namespace { struct MultiLockGuard { std::map<uint16_t, vespalib::Monitor*> monitors; - std::vector<std::shared_ptr<vespalib::MonitorGuard> > guards; + std::vector<std::shared_ptr<vespalib::MonitorGuard>> guards; - MultiLockGuard() {} + MultiLockGuard() = default; void addLock(vespalib::Monitor& monitor, uint16_t index) { monitors[index] = &monitor; @@ -931,7 +931,7 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) PriorityIdx& idx(bmi::get<1>(_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); - while (iter != end && isLocked(guard, iter->_bucket)) { + while (iter != end && isLocked(guard, iter->_bucket, iter->_command->lockingRequirements())) { iter++; } if (iter != end) { @@ -959,6 +959,13 @@ FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) } api::StorageMessage & m(*range.first->_command); + // For now, don't allow batching of operations across lock requirement modes. + // We might relax this requirement later once we're 100% sure it can't trigger + // any unfortunate edge cases. + if (lck.first->lockingRequirements() != m.lockingRequirements()) { + lck.second.reset(); + return lck; + } uint64_t waitTime(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()])); @@ -992,7 +999,8 @@ FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, Priority if (!messageTimedOutInQueue(*msg, waitTime)) { auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(), - msg->getType().getId(), msg->getMsgId()); + msg->getType().getId(), msg->getMsgId(), + msg->lockingRequirements()); guard.unlock(); return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); } else { @@ -1090,10 +1098,65 @@ FileStorHandlerImpl::Stripe::flush() lockGuard.wait(100); } } + +void FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket, + api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId) { + vespalib::MonitorGuard guard(_lock); + auto iter = _lockedBuckets.find(bucket); + assert(iter != _lockedBuckets.end()); + auto& entry = iter->second; + + if (reqOfReleasedLock == api::LockingRequirements::Exclusive) { + assert(entry._exclusiveLock); + assert(entry._exclusiveLock->msgId == lockMsgId); + entry._exclusiveLock.reset(); + } else { + assert(!entry._exclusiveLock); + auto shared_iter = entry._sharedLocks.find(lockMsgId); + assert(shared_iter != entry._sharedLocks.end()); + entry._sharedLocks.erase(shared_iter); + } + + if (!entry._exclusiveLock && entry._sharedLocks.empty()) { + _lockedBuckets.erase(iter); // No more locks held + } + guard.broadcast(); +} + +void FileStorHandlerImpl::Stripe::lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry) { + auto& entry = _lockedBuckets[bucket]; + assert(!entry._exclusiveLock); + if (lockReq == api::LockingRequirements::Exclusive) { + assert(entry._sharedLocks.empty()); + entry._exclusiveLock = lockEntry; + } else { + // TODO use a hash set with a custom comparator/hasher instead...? + auto inserted = entry._sharedLocks.insert(std::make_pair(lockEntry.msgId, lockEntry)); + (void) inserted; + assert(inserted.second); + } +} + bool -FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket) const noexcept +FileStorHandlerImpl::Stripe::isLocked(const vespalib::MonitorGuard &, const document::Bucket& bucket, + api::LockingRequirements lockReq) const noexcept { - return (bucket.getBucketId().getRawId() != 0) && (_lockedBuckets.find(bucket) != _lockedBuckets.end()); + if (bucket.getBucketId().getRawId() == 0) { + return false; + } + auto iter = _lockedBuckets.find(bucket); + if (iter == _lockedBuckets.end()) { + return false; + } + if (iter->second._exclusiveLock) { + return true; + } + // Shared locks can be taken alongside other shared locks, but exclusive locks + // require that no shared locks are currently present. + return ((lockReq == api::LockingRequirements::Exclusive) + && !iter->second._sharedLocks.empty()); } uint32_t @@ -1114,33 +1177,26 @@ FileStorHandlerImpl::getQueueSize(uint16_t disk) const FileStorHandlerImpl::BucketLock::BucketLock(const vespalib::MonitorGuard & guard, Stripe& stripe, const document::Bucket &bucket, uint8_t priority, - api::MessageType::Id msgType, api::StorageMessage::Id msgId) + api::MessageType::Id msgType, api::StorageMessage::Id msgId, + api::LockingRequirements lockReq) : _stripe(stripe), - _bucket(bucket) + _bucket(bucket), + _uniqueMsgId(msgId), + _lockReq(lockReq) { - (void) guard; if (_bucket.getBucketId().getRawId() != 0) { - // Lock the bucket and wait until it is not the current operation for - // the disk itself. - _stripe.lock(guard, _bucket, Stripe::LockEntry(priority, msgType, msgId)); - LOG(debug, "Locked bucket %s with priority %u", - bucket.getBucketId().toString().c_str(), priority); - - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "acquired filestor lock", false, - debug::BucketOperationLogger::State::BUCKET_LOCKED); + _stripe.lock(guard, _bucket, lockReq, Stripe::LockEntry(priority, msgType, msgId)); + LOG(debug, "Locked bucket %s for message %zu with priority %u in mode %s", + bucket.getBucketId().toString().c_str(), msgId, priority, api::to_string(lockReq)); } } -FileStorHandlerImpl::BucketLock::~BucketLock() -{ +FileStorHandlerImpl::BucketLock::~BucketLock() { if (_bucket.getBucketId().getRawId() != 0) { - _stripe.release(_bucket); - LOG(debug, "Unlocked bucket %s", _bucket.getBucketId().toString().c_str()); - LOG_BUCKET_OPERATION_SET_LOCK_STATE( - _bucket.getBucketId(), "released filestor lock", true, - debug::BucketOperationLogger::State::BUCKET_UNLOCKED); + _stripe.release(_bucket, _lockReq, _uniqueMsgId); + LOG(debug, "Unlocked bucket %s for message %zu in mode %s", + _bucket.getBucketId().toString().c_str(), _uniqueMsgId, api::to_string(_lockReq)); } } @@ -1182,14 +1238,31 @@ FileStorHandlerImpl::Stripe::dumpQueueHtml(std::ostream & os) const } } +namespace { + +void dump_lock_entry(const document::BucketId& bucketId, const FileStorHandlerImpl::Stripe::LockEntry& entry, + api::LockingRequirements lock_mode, uint32_t now_ts, std::ostream& os) { + os << api::MessageType::get(entry.msgType).getName() << ":" << entry.msgId << " (" + << bucketId << ", " << api::to_string(lock_mode) + << " lock) Running for " << (now_ts - entry.timestamp) << " secs<br/>\n"; +} + +} + void FileStorHandlerImpl::Stripe::dumpActiveHtml(std::ostream & os) const { uint32_t now = time(nullptr); vespalib::MonitorGuard guard(_lock); for (const auto & e : _lockedBuckets) { - os << api::MessageType::get(e.second.msgType).getName() << ":" << e.second.msgId << " (" << e.first.getBucketId() - << ") Running for " << (now - e.second.timestamp) << " secs<br/>\n"; + if (e.second._exclusiveLock) { + dump_lock_entry(e.first.getBucketId(), *e.second._exclusiveLock, + api::LockingRequirements::Exclusive, now, os); + } + for (const auto& shared : e.second._sharedLocks) { + dump_lock_entry(e.first.getBucketId(), shared.second, + api::LockingRequirements::Shared, now, os); + } } } @@ -1238,7 +1311,6 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath& } for (auto & entry : _mergeStates) { out << "<b>" << entry.first.toString() << "</b><br>\n"; - // << "<p>" << it->second << "</p>\n"; // Gets very spammy with the complete state here.. } } } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 45ac5ded47f..f9dcca4315b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -30,6 +30,7 @@ #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/hash_map.h> #include <atomic> +#include <optional> namespace storage { @@ -82,13 +83,19 @@ public: api::MessageType::Id msgType; api::StorageMessage::Id msgId; - LockEntry() : timestamp(0), priority(0), msgType(), msgId(0) { } LockEntry(uint8_t priority_, api::MessageType::Id msgType_, api::StorageMessage::Id msgId_) : timestamp(time(nullptr)), priority(priority_), msgType(msgType_), msgId(msgId_) { } }; + + struct MultiLockEntry { + std::optional<LockEntry> _exclusiveLock; + using SharedLocks = vespalib::hash_map<api::StorageMessage::Id, LockEntry>; + SharedLocks _sharedLocks; + }; + Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); ~Stripe(); void flush(); @@ -105,19 +112,16 @@ public: vespalib::MonitorGuard guard(_lock); return _queue.size(); } - void release(const document::Bucket & bucket){ - vespalib::MonitorGuard guard(_lock); - _lockedBuckets.erase(bucket); - guard.broadcast(); - } + void release(const document::Bucket & bucket, api::LockingRequirements reqOfReleasedLock, + api::StorageMessage::Id lockMsgId); - bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&) const noexcept; + bool isLocked(const vespalib::MonitorGuard &, const document::Bucket&, + api::LockingRequirements lockReq) const noexcept; - void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, const LockEntry & lockEntry) { - _lockedBuckets.insert(std::make_pair(bucket, lockEntry)); - } + void lock(const vespalib::MonitorGuard &, const document::Bucket & bucket, + api::LockingRequirements lockReq, const LockEntry & lockEntry); - std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket); + std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); @@ -131,9 +135,11 @@ public: void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } private: bool hasActive(vespalib::MonitorGuard & monitor, const AbortBucketOperationsCommand& cmd) const; + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts + // with its locking requirements. FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter); - typedef vespalib::hash_map<document::Bucket, LockEntry, document::Bucket::hash> LockedBuckets; + using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; FileStorStripeMetrics *_metrics; @@ -178,8 +184,8 @@ public: return _stripes[stripeId].getNextMessage(lck); } std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket) { - return stripe(bucket).lock(bucket); + lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { + return stripe(bucket).lock(bucket, lockReq); } void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { stripe(bucket).failOperations(bucket, code); @@ -194,7 +200,7 @@ public: // Disperse bucket bits by multiplying with the 64-bit FNV-1 prime. // This avoids an inherent affinity between the LSB of a bucket's bits // and the stripe an operation ends up on. - return bucket.getBucketId().getRawId() * 1099511628211ULL; + return bucket.getBucketId().getId() * 1099511628211ULL; } Stripe & stripe(const document::Bucket & bucket) { return _stripes[dispersed_bucket_bits(bucket) % _stripes.size()]; @@ -208,15 +214,20 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: + // TODO refactor, too many params BucketLock(const vespalib::MonitorGuard & guard, Stripe& disk, const document::Bucket &bucket, - uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id); + uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id, + api::LockingRequirements lockReq); ~BucketLock(); const document::Bucket &getBucket() const override { return _bucket; } + api::LockingRequirements lockingRequirements() const noexcept override { return _lockReq; } private: Stripe & _stripe; document::Bucket _bucket; + api::StorageMessage::Id _uniqueMsgId; + api::LockingRequirements _lockReq; }; FileStorHandlerImpl(uint32_t numStripes, MessageSender&, FileStorMetrics&, @@ -253,8 +264,8 @@ public: uint32_t getNextStripeId(uint32_t disk); std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, uint16_t disk) { - return _diskInfo[disk].lock(bucket); + lock(const document::Bucket & bucket, uint16_t disk, api::LockingRequirements lockReq) { + return _diskInfo[disk].lock(bucket, lockReq); } void addMergeStatus(const document::Bucket&, MergeStatus::SP); diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index ba7f5979569..d0572e7dbf8 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -38,6 +38,9 @@ public: void setMaxByteSize(uint32_t maxByteSize) { _maxByteSize = maxByteSize; } uint32_t getMaxByteSize() const { return _maxByteSize; } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: @@ -105,6 +108,9 @@ public: spi::ReadConsistency getReadConsistency() const noexcept { return _readConsistency; } + api::LockingRequirements lockingRequirements() const noexcept override { + return api::LockingRequirements::Shared; + } std::unique_ptr<api::StorageReply> makeReply() override; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c2dcb8e2a29..888dc93dd82 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -122,9 +122,14 @@ PersistenceUtil::lockAndGetDisk(const document::Bucket &bucket, result.disk = getPreferredAvailableDisk(bucket); while (true) { + // This function is only called in a context where we require exclusive + // locking (split/join). Refactor if this no longer the case. std::shared_ptr<FileStorHandler::BucketLockInterface> lock( - _fileStorHandler.lock(bucket, result.disk)); + _fileStorHandler.lock(bucket, result.disk, api::LockingRequirements::Exclusive)); + // TODO disks are no longer used in practice, can we safely discard this? + // Might need it for synchronization purposes if something has taken the + // disk lock _and_ the bucket lock...? StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "join-lockAndGetDisk-1", flags)); if (entry.exist() && entry->disk != result.disk) { diff --git a/storage/src/vespa/storage/visiting/stor-visitor.def b/storage/src/vespa/storage/visiting/stor-visitor.def index 1e80f2993a5..6f16bcb60a2 100644 --- a/storage/src/vespa/storage/visiting/stor-visitor.def +++ b/storage/src/vespa/storage/visiting/stor-visitor.def @@ -24,6 +24,7 @@ defaultparalleliterators int default=8 ## will be 16 requests to persistence layer, but only 8 will be able to execute ## at the same time, since only one operation can be executed at the same time ## for one bucket) +## DEPRECATED: ignored by backend, 1 is always used. iterators_per_bucket int default=1 ## Default number of maximum client replies pending. diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index a8f31514eb1..b12a1eb6e7f 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -637,7 +637,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) _ignoreNonExistingVisitorTimeLimit = config.ignorenonexistingvisitortimelimit; _defaultParallelIterators = config.defaultparalleliterators; - _iteratorsPerBucket = config.iteratorsPerBucket; _defaultPendingMessages = config.defaultpendingmessages; _defaultDocBlockSize = config.defaultdocblocksize; _visitorMemoryUsageLimit = config.visitorMemoryUsageLimit; @@ -647,12 +646,6 @@ VisitorThread::onInternal(const std::shared_ptr<api::InternalCommand>& cmd) LOG(config, "Cannot use value of defaultParallelIterators < 1"); _defaultParallelIterators = 1; } - if (_iteratorsPerBucket < 1 && _iteratorsPerBucket > 10) { - if (_iteratorsPerBucket < 1) _iteratorsPerBucket = 1; - else _iteratorsPerBucket = 10; - LOG(config, "Invalid value of iterators per bucket %u using %u", - config.iteratorsPerBucket, _iteratorsPerBucket); - } if (_defaultPendingMessages < 1) { LOG(config, "Cannot use value of defaultPendingMessages < 1"); _defaultPendingMessages = 1; |