diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-10-26 12:11:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-26 12:11:56 +0100 |
commit | 33d1633018a8ea5bb2998281c8490f099de3ac19 (patch) | |
tree | bfeb88248e491b13878cf1e8e8467f63cd6781c9 | |
parent | 88330f8df080a9204aa13b5668e2e9754f6e279d (diff) | |
parent | 36c660bc1faff21f1566552092668b3cff9ce58d (diff) |
Merge pull request #15033 from vespa-engine/geirst/async-message-handling-on-schedule
Add support for async message handling when scheduling storage messag…
9 files changed, 253 insertions, 23 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index bf1b4294b5b..1cec77832a7 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -63,3 +63,11 @@ enable_merge_local_node_choose_docs_optimalization bool default=true restart ## if splitting is expensive, but listing document identifiers is fairly cheap. ## This is true for memfile persistence layer, but not for vespa search. enable_multibit_split_optimalization bool default=true restart + +## Whether or not to use async message handling when scheduling storage messages from FileStorManager. +## +## When turned on, the calling thread (e.g. FNET network thread when using Storage API RPC) +## gets the next async message to handle (if any) as part of scheduling a storage message. +## This async message is then handled by the calling thread immediately, +## instead of going via a persistence thread. +use_async_message_handling_on_schedule bool default=false restart diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 9142a03ab85..9de47b4a8a9 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -60,9 +60,17 @@ spi::LoadType defaultLoadType(0, "default"); struct TestFileStorComponents; +document::Bucket +make_bucket_for_doc(const document::DocumentId& docid) +{ + document::BucketIdFactory factory; + document::BucketId bucket_id(16, factory.getBucketId(docid).getRawId()); + return makeDocumentBucket(bucket_id); +} + } -struct FileStorManagerTest : Test{ +struct FileStorTestBase : Test { enum {LONG_WAITTIME=60}; unique_ptr<TestServiceLayerApp> _node; std::unique_ptr<vdstestlib::DirConfig> config; @@ -71,13 +79,13 @@ struct FileStorManagerTest : Test{ const uint32_t _waitTime; const document::DocumentType* _testdoctype1; - FileStorManagerTest() : _node(), _waitTime(LONG_WAITTIME) {} + FileStorTestBase() : _node(), _waitTime(LONG_WAITTIME) {} + ~FileStorTestBase(); void SetUp() override; void TearDown() override; - void createBucket(document::BucketId bid, uint16_t disk) - { + void createBucket(document::BucketId bid, uint16_t disk) { spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); assert(disk == 0u); _node->getPersistenceProvider().createBucket(makeSpiBucket(bid), context); @@ -88,11 +96,29 @@ struct FileStorManagerTest : Test{ entry.write(); } - document::Document::UP createDocument(const std::string& content, const std::string& id) - { + document::Document::UP createDocument(const std::string& content, const std::string& id) { return _node->getTestDocMan().createDocument(content, id); } + std::shared_ptr<api::PutCommand> make_put_command(StorageMessage::Priority pri = 20, + const std::string& docid = "id:foo:testdoctype1::bar", + Timestamp timestamp = 100) { + Document::SP doc(createDocument("my content", docid).release()); + auto bucket = make_bucket_for_doc(doc->getId()); + auto cmd = std::make_shared<api::PutCommand>(bucket, std::move(doc), timestamp); + cmd->setPriority(pri); + return cmd; + } + + std::shared_ptr<api::GetCommand> make_get_command(StorageMessage::Priority pri, + const std::string& docid = "id:foo:testdoctype1::bar") { + document::DocumentId did(docid); + auto bucket = make_bucket_for_doc(did); + auto cmd = std::make_shared<api::GetCommand>(bucket, did, document::AllFields::NAME); + cmd->setPriority(pri); + return cmd; + } + bool ownsBucket(uint16_t distributorIndex, const document::BucketId& bucket) const { @@ -163,10 +189,12 @@ struct FileStorManagerTest : Test{ const Metric& metric); auto& thread_metrics_of(FileStorManager& manager) { - return manager._metrics->disk->threads[0]; + return manager.get_metrics().disk->threads[0]; } }; +FileStorTestBase::~FileStorTestBase() = default; + std::string findFile(const std::string& path, const std::string& file) { FastOS_DirectoryScan dirScan(path.c_str()); while (dirScan.ReadNext()) { @@ -207,7 +235,7 @@ struct TestFileStorComponents { DummyStorageLink top; FileStorManager* manager; - explicit TestFileStorComponents(FileStorManagerTest& test, + explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false) : manager(new FileStorManager((use_small_config ? test.smallConfig : test.config)->getConfigId(), test._node->getPersistenceProvider(), @@ -227,7 +255,7 @@ struct FileStorHandlerComponents { FileStorMetrics metrics; std::unique_ptr<FileStorHandler> filestorHandler; - FileStorHandlerComponents(FileStorManagerTest& test, uint32_t threadsPerDisk = 1) + FileStorHandlerComponents(FileStorTestBase& test, uint32_t threadsPerDisk = 1) : top(), dummyManager(new DummyStorageLink), messageSender(*dummyManager), @@ -253,7 +281,7 @@ struct PersistenceHandlerComponents : public FileStorHandlerComponents { BucketOwnershipNotifier bucketOwnershipNotifier; std::unique_ptr<PersistenceHandler> persistenceHandler; - PersistenceHandlerComponents(FileStorManagerTest& test) + PersistenceHandlerComponents(FileStorTestBase& test) : FileStorHandlerComponents(test), component(test._node->getComponentRegister(), "test"), bucketOwnershipNotifier(component, messageSender), @@ -277,17 +305,21 @@ PersistenceHandlerComponents::~PersistenceHandlerComponents() = default; } void -FileStorManagerTest::SetUp() +FileStorTestBase::SetUp() { setupDisks(); } void -FileStorManagerTest::TearDown() +FileStorTestBase::TearDown() { _node.reset(0); } +struct FileStorManagerTest : public FileStorTestBase { + +}; + TEST_F(FileStorManagerTest, header_only_put) { TestFileStorComponents c(*this); auto& top = c.top; @@ -947,10 +979,10 @@ TEST_F(FileStorManagerTest, split_single_group) { } void -FileStorManagerTest::putDoc(DummyStorageLink& top, - FileStorHandler& filestorHandler, - const document::BucketId& target, - uint32_t docNum) +FileStorTestBase::putDoc(DummyStorageLink& top, + FileStorHandler& filestorHandler, + const document::BucketId& target, + uint32_t docNum) { api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); spi::Context context(defaultLoadType, spi::Priority(0), @@ -1838,7 +1870,7 @@ TEST_F(FileStorManagerTest, create_bucket_sets_active_flag_in_database_and_reply } template <typename Metric> -void FileStorManagerTest::assert_request_size_set(TestFileStorComponents& c, std::shared_ptr<api::StorageMessage> cmd, const Metric& metric) { +void FileStorTestBase::assert_request_size_set(TestFileStorComponents& c, std::shared_ptr<api::StorageMessage> cmd, const Metric& metric) { api::StorageMessageAddress address("storage", lib::NodeType::STORAGE, 3); cmd->setApproxByteSize(54321); cmd->setAddress(address); @@ -1965,4 +1997,97 @@ TEST_F(FileStorManagerTest, bucket_db_is_populated_from_provider_when_initialize EXPECT_EQ(reported_state->getState(), lib::State::UP); } +struct FileStorHandlerTest : public FileStorTestBase { + std::unique_ptr<FileStorHandlerComponents> c; + FileStorHandler* handler; + FileStorHandlerTest() + : FileStorTestBase(), + c(), + handler() + {} + void SetUp() override { + FileStorTestBase::SetUp(); + c = std::make_unique<FileStorHandlerComponents>(*this); + handler = c->filestorHandler.get(); + } + FileStorHandler::LockedMessage get_next_message() { + return handler->getNextMessage(0); + } +}; + +void +expect_async_message(StorageMessage::Priority exp_pri, + const FileStorHandler::ScheduleAsyncResult& result) +{ + EXPECT_TRUE(result.was_scheduled()); + ASSERT_TRUE(result.has_async_message()); + EXPECT_EQ(exp_pri, result.async_message().second->getPriority()); +} + +void +expect_empty_async_message(const FileStorHandler::ScheduleAsyncResult& result) +{ + EXPECT_TRUE(result.was_scheduled()); + EXPECT_FALSE(result.has_async_message()); +} + +TEST_F(FileStorHandlerTest, message_not_scheduled_if_handler_is_closed) +{ + handler->setDiskState(FileStorHandler::DiskState::CLOSED); + auto result = handler->schedule_and_get_next_async_message(make_put_command()); + EXPECT_FALSE(result.was_scheduled()); +} + +TEST_F(FileStorHandlerTest, no_async_message_returned_if_handler_is_paused) +{ + auto guard = handler->pause(); + auto result = handler->schedule_and_get_next_async_message(make_put_command()); + expect_empty_async_message(result); +} + +TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule) +{ + handler->schedule(make_put_command(20)); + handler->schedule(make_put_command(40)); + { + auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); + expect_async_message(20, result); + } + EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(40, get_next_message().second->getPriority()); +} + +TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async) +{ + // GET is not an async message. + handler->schedule(make_get_command(20)); + + auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); + expect_empty_async_message(result); + + EXPECT_EQ(20, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().second->getPriority()); +} + +TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) +{ + std::string docid_a = "id:foo:testdoctype1::a"; + std::string docid_b = "id:foo:testdoctype1::b"; + handler->schedule(make_put_command(20, docid_a)); + { + auto locked_msg = get_next_message(); + { + // Bucket for docid_a is locked and put command for same bucket is inhibited. + auto result = handler->schedule_and_get_next_async_message(make_put_command(30, docid_a)); + expect_empty_async_message(result); + } + { + // Put command for another bucket is ok. + auto result = handler->schedule_and_get_next_async_message(make_put_command(40, docid_b)); + expect_async_message(40, result); + } + } + EXPECT_EQ(30, get_next_message().second->getPriority()); +} + } // storage diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 1d1f5caf673..5344553dd45 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -182,6 +182,19 @@ AsyncHandler::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trackerUP } bool +AsyncHandler::is_async_message(api::MessageType::Id type_id) noexcept +{ + switch (type_id) { + case api::MessageType::PUT_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVE_ID: + return true; + default: + return false; + } +} + +bool AsyncHandler::tasConditionExists(const api::TestAndSetCommand & cmd) { return cmd.getCondition().isPresent(); } diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h index c25f2ea0be6..92bf72e7c51 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.h +++ b/storage/src/vespa/storage/persistence/asynchandler.h @@ -25,6 +25,7 @@ public: MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const; MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const; + static bool is_async_message(api::MessageType::Id type_id) noexcept; private: static bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 44e768c9db7..aafc87aa84f 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -58,6 +58,30 @@ public: }; using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>; + class ScheduleAsyncResult { + private: + bool _was_scheduled; + LockedMessage _async_message; + + public: + ScheduleAsyncResult() : _was_scheduled(false), _async_message() {} + explicit ScheduleAsyncResult(LockedMessage&& async_message_in) + : _was_scheduled(true), + _async_message(std::move(async_message_in)) + {} + bool was_scheduled() const { + return _was_scheduled; + } + bool has_async_message() const { + return _async_message.first.get() != nullptr; + } + const LockedMessage& async_message() const { + return _async_message; + } + LockedMessage&& release_async_message() { + return std::move(_async_message); + } + }; enum DiskState { AVAILABLE, @@ -104,6 +128,11 @@ public: virtual bool schedule(const std::shared_ptr<api::StorageMessage>&) = 0; /** + * Schedule the given message to be processed and return the next async message to process (if any). + */ + virtual ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) = 0; + + /** * Used by file stor threads to get their next message to process. * * @param stripe The stripe to get messages for diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 0c34a421c06..14074b65c5c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -10,6 +10,7 @@ #include <vespa/storage/common/statusmessages.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/storage/common/messagebucket.h> +#include <vespa/storage/persistence/asynchandler.h> #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/stllike/hash_map.hpp> @@ -258,6 +259,16 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg) return false; } +FileStorHandler::ScheduleAsyncResult +FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) +{ + if (getState() == FileStorHandler::AVAILABLE) { + document::Bucket bucket = getStorageMessageBucket(*msg); + return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket))); + } + return {}; +} + bool FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) { @@ -911,6 +922,24 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) } FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) +{ + if (_owner.isClosed() || _owner.isPaused()) { + return {}; + } + PriorityIdx& idx(bmi::get<1>(*_queue)); + PriorityIdx::iterator iter(idx.begin()), end(idx.end()); + + while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { + ++iter; + } + if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) { + return getMessage(guard, idx, iter); + } + return {}; +} + +FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { api::StorageMessage & m(*iter->_command); @@ -989,6 +1018,14 @@ bool FileStorHandlerImpl::Stripe::schedule(MessageEntry messageEntry) return true; } +FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry entry) +{ + std::unique_lock guard(*_lock); + _queue->emplace_back(std::move(entry)); + return get_next_async_message(guard); +} + void FileStorHandlerImpl::Stripe::flush() { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 6aac8b0474b..549de164229 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -101,6 +101,7 @@ public: ~Stripe(); void flush(); bool schedule(MessageEntry messageEntry); + FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry); void waitUntilNoLocks() const; void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd); void waitInactive(const AbortBucketOperationsCommand& cmd) const; @@ -137,6 +138,8 @@ public: void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } private: bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; + FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, @@ -184,6 +187,7 @@ public: DiskState getDiskState() const override; void close() override; bool schedule(const std::shared_ptr<api::StorageMessage>&) override; + ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override; FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 188523af38d..2653391ecfa 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -48,6 +48,7 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _configFetcher(configUri.getContext()), _threadLockCheckInterval(60), _failDiskOnError(false), + _use_async_message_handling_on_schedule(false), _metrics(std::make_unique<FileStorMetrics>(_component.getLoadTypes()->getMetricLoadTypes())), _closed(false), _lock() @@ -151,6 +152,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _threadLockCheckInterval = config->diskOperationTimeout; _failDiskOnError = (config->failDiskAfterErrorCount > 0); + _use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule; if (!liveUpdate) { _config = std::move(config); @@ -258,10 +260,20 @@ FileStorManager::handlePersistenceMessage(const shared_ptr<api::StorageMessage>& api::ReturnCode errorCode(api::ReturnCode::OK); LOG(spam, "Received %s. Attempting to queue it.", msg->getType().getName().c_str()); - if (_filestorHandler->schedule(msg)) { - LOG(spam, "Received persistence message %s. Queued it to disk", - msg->getType().getName().c_str()); - return true; + if (_use_async_message_handling_on_schedule) { + auto result = _filestorHandler->schedule_and_get_next_async_message(msg); + if (result.was_scheduled()) { + if (result.has_async_message()) { + getThreadLocalHandler().processLockedMessage(result.release_async_message()); + } + return true; + } + } else { + if (_filestorHandler->schedule(msg)) { + LOG(spam, "Received persistence message %s. Queued it to disk", + msg->getType().getName().c_str()); + return true; + } } switch (_filestorHandler->getDiskState()) { case FileStorHandler::DISABLED: diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index ee66bc7d77c..2953462dd1e 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -65,14 +65,13 @@ class FileStorManager : public StorageLinkQueued, config::ConfigFetcher _configFetcher; uint32_t _threadLockCheckInterval; // In seconds bool _failDiskOnError; + bool _use_async_message_handling_on_schedule; std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor; bool _closed; std::mutex _lock; - friend struct FileStorManagerTest; - public: FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, ServiceLayerComponentRegister&, DoneInitializeHandler&); @@ -105,6 +104,8 @@ public: // yet at that point in time. void initialize_bucket_databases_from_provider(); + const FileStorMetrics& get_metrics() const { return *_metrics; } + private: void configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) override; PersistenceHandler & createRegisteredHandler(const ServiceLayerComponent & component); |