diff options
Diffstat (limited to 'storage')
17 files changed, 734 insertions, 123 deletions
diff --git a/storage/src/tests/persistence/filestorage/CMakeLists.txt b/storage/src/tests/persistence/filestorage/CMakeLists.txt index f1a8a286bbd..951a361474e 100644 --- a/storage/src/tests/persistence/filestorage/CMakeLists.txt +++ b/storage/src/tests/persistence/filestorage/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST SOURCES deactivatebucketstest.cpp deletebuckettest.cpp + feed_operation_batching_test.cpp filestormanagertest.cpp filestormodifiedbucketstest.cpp mergeblockingtest.cpp diff --git a/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp new file mode 100644 index 00000000000..cf16123933b --- /dev/null +++ b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp @@ -0,0 +1,318 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <tests/persistence/common/filestortestfixture.h> +#include <tests/persistence/filestorage/forwardingmessagesender.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/persistence/filestorage/filestormetrics.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <gtest/gtest.h> + +using document::test::makeDocumentBucket; +using document::BucketId; +using document::DocumentId; +using namespace ::testing; + +namespace storage { + +struct FeedOperationBatchingTest : FileStorTestFixture { + DummyStorageLink _top; + std::unique_ptr<ForwardingMessageSender> _message_sender; + FileStorMetrics _metrics; + std::unique_ptr<FileStorHandler> _handler; + api::Timestamp _next_timestamp; + + FeedOperationBatchingTest(); + ~FeedOperationBatchingTest() override; + + void SetUp() override { + FileStorTestFixture::SetUp(); + // This silly little indirection is a work-around for the top-level link needing something + // below it to send _up_ into it, rather than directly receiving the messages itself. + auto message_receiver = std::make_unique<DummyStorageLink>(); + _message_sender = std::make_unique<ForwardingMessageSender>(*message_receiver); + _top.push_back(std::move(message_receiver)); + _top.open(); + _metrics.initDiskMetrics(1, 1); + // By default, sets up 1 thread with 1 stripe + _handler = std::make_unique<FileStorHandlerImpl>(*_message_sender, _metrics, _node->getComponentRegister()); + _handler->set_max_feed_op_batch_size(3); + } + + void TearDown() override { + _handler.reset(); + FileStorTestFixture::TearDown(); + } + + [[nodiscard]] static vespalib::string id_str_of(uint32_t bucket_idx, uint32_t doc_idx) { + return vespalib::make_string("id:foo:testdoctype1:n=%u:%u", bucket_idx, doc_idx); + } + + [[nodiscard]] static DocumentId id_of(uint32_t bucket_idx, uint32_t doc_idx) { + return DocumentId(id_str_of(bucket_idx, doc_idx)); + } + + void schedule_msg(const std::shared_ptr<api::StorageMessage>& msg) { + msg->setAddress(makeSelfAddress()); + _handler->schedule(msg); // takes shared_ptr by const ref, no point in moving + } + + void send_put(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp, vespalib::duration timeout) { + auto id = id_str_of(bucket_idx, doc_idx); + auto doc = _node->getTestDocMan().createDocument("foobar", id); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket({16, bucket_idx}), std::move(doc), timestamp); + cmd->setTimeout(timeout); + schedule_msg(cmd); + } + + void send_put(uint32_t bucket_idx, uint32_t doc_idx) { + send_put(bucket_idx, doc_idx, next_timestamp(), 60s); + } + + void send_puts(std::initializer_list<std::pair<uint32_t, uint32_t>> bucket_docs) { + for (const auto& bd : bucket_docs) { + send_put(bd.first, bd.second); + } + } + + void send_get(uint32_t bucket_idx, uint32_t doc_idx) { + auto id = id_of(bucket_idx, doc_idx); + auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket({16, bucket_idx}), id, document::AllFields::NAME); + schedule_msg(cmd); + } + + void send_remove(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) { + auto id = id_of(bucket_idx, doc_idx); + auto cmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket({16, bucket_idx}), id, timestamp); + schedule_msg(cmd); + } + + void send_remove(uint32_t bucket_idx, uint32_t doc_idx) { + send_remove(bucket_idx, doc_idx, next_timestamp()); + } + + void send_update(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) { + auto id = id_of(bucket_idx, doc_idx); + auto update = std::make_shared<document::DocumentUpdate>( + _node->getTestDocMan().getTypeRepo(), + _node->getTestDocMan().createRandomDocument()->getType(), id); + auto cmd = std::make_shared<api::UpdateCommand>(makeDocumentBucket({16, bucket_idx}), std::move(update), timestamp); + schedule_msg(cmd); + } + + void send_update(uint32_t bucket_idx, uint32_t doc_idx) { + send_update(bucket_idx, doc_idx, next_timestamp()); + } + + [[nodiscard]] api::Timestamp next_timestamp() { + auto ret = _next_timestamp; + ++_next_timestamp; + return ret; + } + + [[nodiscard]] vespalib::steady_time fake_now() const { + return _node->getClock().getMonotonicTime(); + } + + [[nodiscard]] vespalib::steady_time fake_deadline() const { + return _node->getClock().getMonotonicTime() + 60s; + } + + [[nodiscard]] FileStorHandler::LockedMessageBatch next_batch() { + return _handler->next_message_batch(0, fake_now(), fake_deadline()); + } + + template <typename CmdType> + static void assert_batch_msg_is(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + ASSERT_LT(msg_idx, batch.size()); + auto msg = batch.messages[msg_idx].first; + auto* as_cmd = dynamic_cast<const CmdType*>(msg.get()); + ASSERT_TRUE(as_cmd) << msg->toString() << " does not have the expected type"; + EXPECT_EQ(as_cmd->getBucketId(), BucketId(16, expected_bucket_idx)); + + auto id = as_cmd->getDocumentId(); + ASSERT_TRUE(id.getScheme().hasNumber()); + EXPECT_EQ(id.getScheme().getNumber(), expected_bucket_idx) << id; + std::string actual_id_part = id.getScheme().getNamespaceSpecific(); + std::string expected_id_part = std::to_string(expected_doc_idx); + EXPECT_EQ(actual_id_part, expected_id_part) << id; + } + + static void assert_batch_msg_is_put(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is<api::PutCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_remove(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is<api::RemoveCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_update(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is<api::UpdateCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_get(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is<api::GetCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + enum Type { + Put, + Update, + Remove, + Get + }; + + static void assert_empty_batch(const FileStorHandler::LockedMessageBatch& batch) { + ASSERT_TRUE(batch.empty()); + ASSERT_FALSE(batch.lock); + } + + static void assert_batch(const FileStorHandler::LockedMessageBatch& batch, + uint32_t expected_bucket_idx, + std::initializer_list<std::pair<Type, uint32_t>> expected_msgs) + { + ASSERT_TRUE(batch.lock); + ASSERT_EQ(batch.lock->getBucket().getBucketId(), BucketId(16, expected_bucket_idx)); + ASSERT_EQ(batch.size(), expected_msgs.size()); + + uint32_t idx = 0; + for (const auto& msg : expected_msgs) { + switch (msg.first) { + case Type::Put: assert_batch_msg_is_put(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Update: assert_batch_msg_is_update(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Remove: assert_batch_msg_is_remove(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Get: assert_batch_msg_is_get(batch, idx, expected_bucket_idx, msg.second); break; + default: FAIL(); + } + ++idx; + } + } +}; + +FeedOperationBatchingTest::FeedOperationBatchingTest() + : FileStorTestFixture(), + _top(), + _message_sender(), + _metrics(), + _handler(), + _next_timestamp(1000) +{ +} + +FeedOperationBatchingTest::~FeedOperationBatchingTest() = default; + +// Note: unless explicitly set by the testcase, max batch size is 3 + +TEST_F(FeedOperationBatchingTest, batching_is_disabled_with_1_max_batch_size) { + _handler->set_max_feed_op_batch_size(1); + send_puts({{1, 1}, {1, 2}, {2, 3}, {2, 4}}); + // No batching; has the same behavior as current FIFO + assert_batch(next_batch(), 1, {{Put, 1}}); + assert_batch(next_batch(), 1, {{Put, 2}}); + assert_batch(next_batch(), 2, {{Put, 3}}); + assert_batch(next_batch(), 2, {{Put, 4}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_is_limited_to_configured_max_size) { + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_batch(next_batch(), 1, {{Put, 4}, {Put, 5}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_can_consume_entire_queue) { + send_puts({{1, 1}, {1, 2}, {1, 3}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_is_only_done_for_single_bucket) { + send_puts({{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}}); + assert_batch(next_batch(), 1, {{Put, 1}}); + assert_batch(next_batch(), 2, {{Put, 2}, {Put, 3}, {Put, 4}}); + assert_batch(next_batch(), 3, {{Put, 5}}); +} + +TEST_F(FeedOperationBatchingTest, batch_can_include_all_supported_feed_op_types) { + send_put(1, 1); + send_remove(1, 2); + send_update(1, 3); + assert_batch(next_batch(), 1, {{Put, 1}, {Remove, 2}, {Update, 3}}); +} + +TEST_F(FeedOperationBatchingTest, timed_out_reqeusts_are_ignored_by_batch) { + send_puts({{1, 1}}); + send_put(1, 2, next_timestamp(), 1s); + send_puts({{1, 3}}); + _node->getClock().addSecondsToTime(2); + // Put #2 with 1s timeout has expired in the queue and should not be returned as part of the batch + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 3}}); + ASSERT_EQ(_top.getNumReplies(), 0); + // The actual timeout is handled by the next message fetch invocation + assert_empty_batch(next_batch()); + ASSERT_EQ(_top.getNumReplies(), 1); + EXPECT_EQ(dynamic_cast<api::StorageReply&>(*_top.getReply(0)).getResult().getResult(), api::ReturnCode::TIMEOUT); +} + +TEST_F(FeedOperationBatchingTest, non_feed_ops_are_not_batched) { + send_get(1, 2); + send_get(1, 3); + assert_batch(next_batch(), 1, {{Get, 2}}); + assert_batch(next_batch(), 1, {{Get, 3}}); +} + +TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_non_feed_op) { + // It can reasonably be argued that we could batch _around_ a Get operation and still + // have correct behavior, but the Get here is just a stand-in for an arbitrary operation such + // as a Split (which changes the bucket set), which is rather more tricky to reason about. + // For simplicity and understandability, just stall the batch pipeline (at least for now). + send_get(1, 2); + send_puts({{1, 3}, {1, 4}}); + send_get(1, 5); + send_puts({{1, 6}, {1, 7}}); + + assert_batch(next_batch(), 1, {{Get, 2}}); // If first op is non-feed, only it should be returned + assert_batch(next_batch(), 1, {{Put, 3}, {Put, 4}}); + assert_batch(next_batch(), 1, {{Get, 5}}); + assert_batch(next_batch(), 1, {{Put, 6}, {Put, 7}}); +} + +TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_concurrent_ops_to_same_document) { + // 2 ops to doc #2. Since this is expected to be a very rare edge case, just + // stop batching at that point and defer the concurrent op to the next batch. + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 2}, {1, 4}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_batch(next_batch(), 1, {{Put, 2}, {Put, 4}}); +} + +TEST_F(FeedOperationBatchingTest, batch_respects_persistence_throttling) { + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.min_window_size = 3; + params.max_window_size = 3; + params.window_size_increment = 1; + _handler->use_dynamic_operation_throttling(true); + _handler->reconfigure_dynamic_throttler(params); + _handler->set_max_feed_op_batch_size(10); // > win size to make sure we test the right thing + + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}); + auto batch = next_batch(); // holds 3 throttle tokens + assert_batch(batch, 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + // No more throttle tokens available + assert_empty_batch(next_batch()); +} + +} // storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 1ccd51d3f06..56149bbc14d 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -43,6 +43,7 @@ LOG_SETUP(".filestormanagertest"); using std::unique_ptr; using document::Document; +using document::BucketId; using namespace storage::api; using storage::spi::test::makeSpiBucket; using document::test::makeDocumentBucket; @@ -405,6 +406,38 @@ TEST_F(FileStorManagerTest, put) { } } +TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_info) { + TestFileStorComponents c(*this); + c.manager->getFileStorHandler().set_max_feed_op_batch_size(10); + BucketId bucket_id(16, 1); + createBucket(bucket_id); + constexpr uint32_t n = 10; + { + // Barrier to prevent any messages from being processed until we've enqueued all puts + auto guard = c.manager->getFileStorHandler().lock(makeDocumentBucket(bucket_id), LockingRequirements::Exclusive); + for (uint32_t i = 0; i < n; ++i) { + auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i); + put->setAddress(_storage3); + c.top.sendDown(put); + } + } + // All 10 puts shall now be visible and waiting for the persistence thread to fetch as a single batch. + c.top.waitForMessages(n, _waitTime); + api::BucketInfo expected_bucket_info; + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket_id, "foo")); + ASSERT_TRUE(entry.exists()); + EXPECT_EQ(entry->getBucketInfo().getDocumentCount(), n); + expected_bucket_info = entry->getBucketInfo(); + } + // All replies should have the _same_ bucket info due to being processed in the same batch. + auto replies = c.top.getRepliesOnce(); + for (auto& reply : replies) { + auto actual_bucket_info = dynamic_cast<api::PutReply&>(*reply).getBucketInfo(); + EXPECT_EQ(actual_bucket_info, expected_bucket_info); + } +} + TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { TestFileStorComponents c(*this); @@ -726,7 +759,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { filestorHandler.schedule(cmd); } - std::this_thread::sleep_for(51ms); + _node->getClock().addMilliSecondsToTime(51); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); if (lock.lock.get()) { diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 59e0853cc21..ab176ebb9cb 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -143,7 +143,7 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider MessageTracker::UP AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const { auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->sendReply(); }); spi::Bucket bucket(cmd.getBucket()); @@ -169,7 +169,7 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket()); auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->sendReply(); }); _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), @@ -517,7 +517,7 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack } auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed)); tracker->sendReply(); }); diff --git a/storage/src/vespa/storage/persistence/batched_message.h b/storage/src/vespa/storage/persistence/batched_message.h new file mode 100644 index 00000000000..c23383edfde --- /dev/null +++ b/storage/src/vespa/storage/persistence/batched_message.h @@ -0,0 +1,14 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.// +#pragma once + +#include "shared_operation_throttler.h" +#include <memory> +#include <utility> + +namespace storage { + +namespace api { class StorageMessage; } + +using BatchedMessage = std::pair<std::shared_ptr<api::StorageMessage>, ThrottleToken>; + +} diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index a89b705de1b..274d59899d9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -5,4 +5,15 @@ namespace storage { FileStorHandler::LockedMessage::~LockedMessage() = default; +FileStorHandler::LockedMessageBatch::LockedMessageBatch(LockedMessage&& initial_msg) + : lock(std::move(initial_msg.lock)), + messages() +{ + if (lock) { + messages.emplace_back(std::move(initial_msg.msg), std::move(initial_msg.throttle_token)); + } +} + +FileStorHandler::LockedMessageBatch::~LockedMessageBatch() = default; + } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 68b8411c762..6a8b74baf1d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -1,23 +1,13 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -/** - * \class storage::FileStorHandler - * \ingroup storage - * - * \brief Common resource for filestor threads - * - * Takes care of the interface between file stor threads and the file stor - * manager to avoid circular dependencies, and confine the implementation that - * needs to worry about locking between these components. - */ - #pragma once #include <vespa/document/bucket/bucket.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/storage/persistence/batched_message.h> #include <vespa/storage/persistence/shared_operation_throttler.h> #include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/vespalib/util/small_vector.h> namespace storage { namespace api { @@ -80,13 +70,7 @@ public: std::shared_ptr<api::StorageMessage> msg; ThrottleToken throttle_token; - LockedMessage() noexcept = default; - LockedMessage(std::shared_ptr<BucketLockInterface> lock_, - std::shared_ptr<api::StorageMessage> msg_) noexcept - : lock(std::move(lock_)), - msg(std::move(msg_)), - throttle_token() - {} + constexpr LockedMessage() noexcept = default; LockedMessage(std::shared_ptr<BucketLockInterface> lock_, std::shared_ptr<api::StorageMessage> msg_, ThrottleToken token) noexcept @@ -98,27 +82,40 @@ public: ~LockedMessage(); }; + struct LockedMessageBatch { + std::shared_ptr<BucketLockInterface> lock; + vespalib::SmallVector<BatchedMessage, 1> messages; + + LockedMessageBatch() = default; + explicit LockedMessageBatch(LockedMessage&& initial_msg); + LockedMessageBatch(LockedMessageBatch&&) noexcept = default; + ~LockedMessageBatch(); + + [[nodiscard]] bool empty() const noexcept { return messages.empty(); } + [[nodiscard]] size_t size() const noexcept { return messages.size(); } + // Precondition: messages.size() == 1 + [[nodiscard]] LockedMessage release_as_single_msg() noexcept; + }; + class ScheduleAsyncResult { - private: - bool _was_scheduled; + bool _was_scheduled; LockedMessage _async_message; - public: - ScheduleAsyncResult() : _was_scheduled(false), _async_message() {} - explicit ScheduleAsyncResult(LockedMessage&& async_message_in) + constexpr ScheduleAsyncResult() noexcept : _was_scheduled(false), _async_message() {} + explicit ScheduleAsyncResult(LockedMessage&& async_message_in) noexcept : _was_scheduled(true), _async_message(std::move(async_message_in)) {} - bool was_scheduled() const { + [[nodiscard]] bool was_scheduled() const noexcept { return _was_scheduled; } - bool has_async_message() const { - return _async_message.lock.get() != nullptr; + [[nodiscard]] bool has_async_message() const noexcept { + return static_cast<bool>(_async_message.lock); } - const LockedMessage& async_message() const { + [[nodiscard]] const LockedMessage& async_message() const noexcept { return _async_message; } - LockedMessage&& release_async_message() { + [[nodiscard]] LockedMessage&& release_async_message() noexcept { return std::move(_async_message); } }; @@ -129,7 +126,7 @@ public: }; FileStorHandler() : _getNextMessageTimout(100ms) { } - virtual ~FileStorHandler() = default; + ~FileStorHandler() override = default; /** @@ -171,7 +168,9 @@ public: * * @param stripe The stripe to get messages for */ - virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; + [[nodiscard]] virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; + + [[nodiscard]] virtual LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) = 0; /** Only used for testing, should be removed */ LockedMessage getNextMessage(uint32_t stripeId) { @@ -189,8 +188,6 @@ public: * NB: As current operation can be a split or join operation, make sure that * you always wait for current to finish, if is a super or sub bucket of * the bucket we're locking. - * - * */ virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0; @@ -287,6 +284,8 @@ public: virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0; virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; + + virtual void set_max_feed_op_batch_size(uint32_t max_batch) noexcept = 0; private: vespalib::duration _getNextMessageTimout; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 1984d44652a..2e2634025a7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -12,6 +12,7 @@ #include <vespa/storage/persistence/messages.h> #include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/stllike/hash_set.hpp> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/string_escape.h> @@ -60,7 +61,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false), _throttle_apply_bucket_diff_ops(false), - _last_active_operations_stats() + _last_active_operations_stats(), + _max_feed_op_batch_size(1) { assert(numStripes > 0); _stripes.reserve(numStripes); @@ -241,8 +243,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg) { if (getState() == FileStorHandler::AVAILABLE) { document::Bucket bucket = getStorageMessageBucket(*msg); - stripe(bucket).schedule(MessageEntry(msg, bucket)); - return true; + return stripe(bucket).schedule(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime())); } return false; } @@ -252,7 +253,7 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a { if (getState() == FileStorHandler::AVAILABLE) { document::Bucket bucket = getStorageMessageBucket(*msg); - return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket))); + return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime()))); } return {}; } @@ -403,10 +404,25 @@ FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time dea if (!tryHandlePause()) { return {}; // Still paused, return to allow tick. } - return _stripes[stripeId].getNextMessage(deadline); } +FileStorHandler::LockedMessageBatch +FileStorHandlerImpl::next_message_batch(uint32_t stripe_id, vespalib::steady_time now, vespalib::steady_time deadline) +{ + if (!tryHandlePause()) { + return {}; + } + return _stripes[stripe_id].next_message_batch(now, deadline); +} + +FileStorHandler::LockedMessage +FileStorHandlerImpl::LockedMessageBatch::release_as_single_msg() noexcept +{ + assert(lock && messages.size() == 1); + return {std::move(lock), std::move(messages[0].first), std::move(messages[0].second)}; +} + std::shared_ptr<FileStorHandler::BucketLockInterface> FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) { std::unique_lock guard(*_lock); @@ -858,9 +874,10 @@ FileStorHandlerImpl::sendReplyDirectly(const std::shared_ptr<api::StorageReply>& } FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, - const document::Bucket &bucket) + const document::Bucket& bucket, + vespalib::steady_time scheduled_at_time) : _command(cmd), - _timer(), + _timer(scheduled_at_time), _bucket(bucket), _priority(cmd->getPriority()) { } @@ -939,9 +956,8 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) +FileStorHandlerImpl::Stripe::next_message_impl(monitor_guard& guard, vespalib::steady_time deadline) { - std::unique_lock guard(*_lock); ThrottleToken throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the @@ -993,6 +1009,92 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) } FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) +{ + std::unique_lock guard(*_lock); + return next_message_impl(guard, deadline); +} + +namespace { + +constexpr bool is_batchable_feed_op(api::MessageType::Id id) noexcept { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID); +} + +// Precondition: msg must be a feed operation request (put, remove, update) +document::GlobalId gid_from_feed_op(const api::StorageMessage& msg) { + switch (msg.getType().getId()) { + case api::MessageType::PUT_ID: + return dynamic_cast<const api::PutCommand&>(msg).getDocumentId().getGlobalId(); + case api::MessageType::REMOVE_ID: + return dynamic_cast<const api::RemoveCommand&>(msg).getDocumentId().getGlobalId(); + case api::MessageType::UPDATE_ID: + return dynamic_cast<const api::UpdateCommand&>(msg).getDocumentId().getGlobalId(); + default: abort(); + } +} + +} // anon ns + +FileStorHandler::LockedMessageBatch +FileStorHandlerImpl::Stripe::next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline) +{ + const auto max_batch_size = _owner.max_feed_op_batch_size(); + + std::unique_lock guard(*_lock); + auto initial_locked = next_message_impl(guard, deadline); + if (!initial_locked.lock || !is_batchable_feed_op(initial_locked.msg->getType().getId()) || (max_batch_size == 1)) { + return LockedMessageBatch(std::move(initial_locked)); + } + LockedMessageBatch batch(std::move(initial_locked)); + fill_feed_op_batch(guard, batch, max_batch_size, now); + return batch; +} + +void +FileStorHandlerImpl::Stripe::fill_feed_op_batch(monitor_guard& guard, LockedMessageBatch& batch, + uint32_t max_batch_size, vespalib::steady_time now) +{ + assert(batch.size() == 1); + BucketIdx& idx = bmi::get<2>(*_queue); + auto bucket_msgs = idx.equal_range(batch.lock->getBucket()); + // Process in FIFO order (_not_ priority order) until we hit the end, a non-batchable operation + // (implicit pipeline stall since bucket set might change) or can't get another throttle token. + // We also stall the pipeline if we get a concurrent modification to the same document (not expected, + // as the distributors should prevent this, but _technically_ it is possible). + const auto expected_max_size = std::min(ssize_t(max_batch_size), std::distance(bucket_msgs.first, bucket_msgs.second) + 1); + vespalib::hash_set<document::GlobalId, document::GlobalId::hash> gids_in_batch(expected_max_size); + gids_in_batch.insert(gid_from_feed_op(*batch.messages[0].first)); + for (auto it = bucket_msgs.first; (it != bucket_msgs.second) && (batch.messages.size() < max_batch_size);) { + if (!is_batchable_feed_op(it->_command->getType().getId())) { + break; + } + auto [existing_iter, inserted] = gids_in_batch.insert(gid_from_feed_op(*it->_command)); + if (!inserted) { + break; // Already present in batch + } + if (messageTimedOutInQueue(*it->_command, now - it->_timer.start_time())) { + // We just ignore timed out ops here; actually generating a timeout reply will be done by + // next_message_impl() during a subsequent invocation. This avoids having to deal with any + // potential issues caused by sending a reply up while holding the queue lock, since we + // can't release it here. + ++it; + continue; + } + auto throttle_token = _owner.operation_throttler().try_acquire_one(); + if (!throttle_token.valid()) { + break; + } + // Note: iterator is const; can't std::move(it->_command) + batch.messages.emplace_back(it->_command, std::move(throttle_token)); + it = idx.erase(it); + } + update_cached_queue_size(guard); +} + +FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) { if (_owner.isPaused()) { @@ -1021,9 +1123,11 @@ FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, ThrottleToken throttle_token) { - std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); + std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop( + _owner._component.getClock().getMonotonicTime(), + _metrics->averageQueueWaitingTime))); - std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); + std::shared_ptr<api::StorageMessage> msg = iter->_command; // iter is const; can't std::move() document::Bucket bucket(iter->_bucket); idx.erase(iter); // iter not used after this point. update_cached_queue_size(guard); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index f7c9b218779..ac8e5c52cbf 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -55,7 +55,9 @@ public: document::Bucket _bucket; uint8_t _priority; - MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::Bucket &bId); + MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, + const document::Bucket& bucket, + vespalib::steady_time scheduled_at_time); MessageEntry(MessageEntry &&) noexcept ; MessageEntry(const MessageEntry &) noexcept; MessageEntry & operator = (const MessageEntry &) = delete; @@ -66,13 +68,13 @@ public: } }; - using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry> >; - using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>; - + // ordered_non_unique shall preserve insertion order as iteration order of equal keys, but this is rather magical... + using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry>>; + using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>; using PriorityQueue = bmi::multi_index_container<MessageEntry, bmi::indexed_by<bmi::sequenced<>, PriorityOrder, BucketOrder>>; + using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type; + using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type; - using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type; - using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type; using Clock = std::chrono::steady_clock; using monitor_guard = std::unique_lock<std::mutex>; using atomic_size_t = vespalib::datastore::AtomicValueWrapper<size_t>; @@ -114,7 +116,7 @@ public: ActiveOperationsStats &stats() { return _stats; } }; SafeActiveOperationsStats() : _lock(std::make_unique<std::mutex>()), _stats() {} - Guard guard() { return Guard(*_lock, _stats, ctor_tag()); } + [[nodiscard]] Guard guard() { return Guard(*_lock, _stats, ctor_tag()); } }; Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); @@ -123,8 +125,8 @@ public: Stripe & operator =(const Stripe &) = delete; ~Stripe(); void flush(); - bool schedule(MessageEntry messageEntry); - FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry); + [[nodiscard]] bool schedule(MessageEntry messageEntry); + [[nodiscard]] FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry); void waitUntilNoLocks() const; void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd); void waitInactive(const AbortBucketOperationsCommand& cmd) const; @@ -151,18 +153,19 @@ public: api::LockingRequirements lockReq, bool count_as_active_merge, const LockEntry & lockEntry); - std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); + [[nodiscard]] std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); + [[nodiscard]] FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); + [[nodiscard]] FileStorHandler::LockedMessageBatch next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; - std::mutex & exposeLock() { return *_lock; } - PriorityQueue & exposeQueue() { return *_queue; } - BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); } + [[nodiscard]] std::mutex & exposeLock() { return *_lock; } + [[nodiscard]] PriorityQueue & exposeQueue() { return *_queue; } + [[nodiscard]] BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); } void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } - ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const; + [[nodiscard]] ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const; private: void update_cached_queue_size(const std::lock_guard<std::mutex> &) { _cached_queue_size.store_relaxed(_queue->size()); @@ -170,15 +173,20 @@ public: void update_cached_queue_size(const std::unique_lock<std::mutex> &) { _cached_queue_size.store_relaxed(_queue->size()); } - bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; - FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); + [[nodiscard]] bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; + [[nodiscard]] FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); [[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept; + [[nodiscard]] FileStorHandler::LockedMessage next_message_impl(monitor_guard& held_lock, + vespalib::steady_time deadline); + void fill_feed_op_batch(monitor_guard& held_lock, LockedMessageBatch& batch, + uint32_t max_batch_size, vespalib::steady_time now); + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. - FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, - PriorityIdx::iterator iter, - ThrottleToken throttle_token); + [[nodiscard]] FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, + PriorityIdx::iterator iter, + ThrottleToken throttle_token); using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -233,7 +241,8 @@ public: bool schedule(const std::shared_ptr<api::StorageMessage>&) override; ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; + LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; + LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) override; void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; @@ -292,6 +301,13 @@ public: _throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed); } + void set_max_feed_op_batch_size(uint32_t max_batch) noexcept override { + _max_feed_op_batch_size.store(max_batch, std::memory_order_relaxed); + } + [[nodiscard]] uint32_t max_feed_op_batch_size() const noexcept { + return _max_feed_op_batch_size.load(std::memory_order_relaxed); + } + // Implements ResumeGuard::Callback void resume() override; @@ -316,6 +332,7 @@ private: std::atomic<bool> _paused; std::atomic<bool> _throttle_apply_bucket_diff_ops; std::optional<ActiveOperationsStats> _last_active_operations_stats; + std::atomic<uint32_t> _max_feed_op_batch_size; // Returns the index in the targets array we are sending to, or -1 if none of them match. int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 0046bd96b65..23de39f7130 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -239,6 +239,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size()); _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params); } + _filestorHandler->set_max_feed_op_batch_size(std::max(1, config.maxFeedOpBatchSize)); // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 0612798b43a..4761937075d 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -134,7 +134,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP _env._metrics.operations.inc(); if (msg.getType().isReply()) { - try{ + try { LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker)); @@ -161,9 +161,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP void PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const { - LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get()); api::StorageMessage & msg(*lock.msg); - // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains // valid even if the tracker is destroyed by an exception in processMessage(). auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, @@ -174,4 +172,24 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } +void +PersistenceHandler::process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock, + std::span<BatchedMessage> bucket_messages) +{ + const auto bucket = lock->getBucket(); + auto batch = std::make_shared<AsyncMessageBatch>(std::move(lock), _env, _env._fileStorHandler); + for (auto& bm : bucket_messages) { + assert(bm.first->getBucket() == bucket); + // Important: we _copy_ the message shared_ptr instead of moving to ensure that `*bm.first` remains + // valid even if the tracker is destroyed by an exception in processMessage(). All std::exceptions + // are caught there, so we do not expect our loop to be interrupted. + auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, + batch, bm.first, std::move(bm.second)); + tracker = processMessage(*bm.first, std::move(tracker)); + if (tracker) { + tracker->sendReply(); // Actually defers to batch reply queue + } + } +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index 0da518a1cfa..224bb70a16b 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -2,14 +2,16 @@ #pragma once -#include "processallhandler.h" -#include "mergehandler.h" #include "asynchandler.h" +#include "batched_message.h" +#include "mergehandler.h" #include "persistenceutil.h" -#include "splitjoinhandler.h" +#include "processallhandler.h" #include "simplemessagehandler.h" +#include "splitjoinhandler.h" #include <vespa/storage/common/storagecomponent.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> +#include <span> namespace storage { @@ -29,6 +31,8 @@ public: ~PersistenceHandler(); void processLockedMessage(FileStorHandler::LockedMessage lock) const; + void process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock, + std::span<BatchedMessage> bucket_messages); //TODO Rewrite tests to avoid this api leak const AsyncHandler & asyncHandler() const { return _asyncHandler; } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index a98418281d2..5710c3fe26f 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -14,6 +14,7 @@ PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, Fi uint32_t stripeId, framework::Component & component) : _persistenceHandler(persistenceHandler), _fileStorHandler(fileStorHandler), + _clock(component.getClock()), _stripeId(stripeId), _thread() { @@ -36,14 +37,19 @@ PersistenceThread::run(framework::ThreadHandle& thread) vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms); while (!thread.interrupted()) { - vespalib::steady_time now = vespalib::steady_clock::now(); + vespalib::steady_time now = _clock.getMonotonicTime(); thread.registerTick(framework::UNKNOWN_CYCLE, now); vespalib::steady_time deadline = now + max_wait_time; - FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline)); - - if (lock.lock) { - _persistenceHandler.processLockedMessage(std::move(lock)); + auto batch = _fileStorHandler.next_message_batch(_stripeId, now, deadline); + if (!batch.empty()) { + // Special-case single message batches, as actually scheduling a full batch has more + // overhead due to extra bookkeeping state and deferred reply-sending. + if (batch.size() == 1) { + _persistenceHandler.processLockedMessage(batch.release_as_single_msg()); + } else { + _persistenceHandler.process_locked_message_batch(std::move(batch.lock), batch.messages); + } } } LOG(debug, "Closing down persistence thread"); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index aacd1dd4830..2e9852ada73 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -8,6 +8,7 @@ namespace storage { namespace framework { + struct Clock; class Component; class Thread; } @@ -27,10 +28,11 @@ public: framework::Thread& getThread() override { return *_thread; } private: - PersistenceHandler & _persistenceHandler; - FileStorHandler & _fileStorHandler; - uint32_t _stripeId; - std::unique_ptr<framework::Thread> _thread; + PersistenceHandler& _persistenceHandler; + FileStorHandler& _fileStorHandler; + const framework::Clock& _clock; + uint32_t _stripeId; + std::unique_ptr<framework::Thread> _thread; void run(framework::ThreadHandle&) override; }; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c975721c855..54bce72d7ff 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -11,41 +11,82 @@ LOG_SETUP(".persistence.util"); namespace storage { namespace { - bool isBatchable(api::MessageType::Id id) - { - return (id == api::MessageType::PUT_ID || - id == api::MessageType::REMOVE_ID || - id == api::MessageType::UPDATE_ID); - } - bool hasBucketInfo(api::MessageType::Id id) - { - return (isBatchable(id) || - (id == api::MessageType::REMOVELOCATION_ID || - id == api::MessageType::JOINBUCKETS_ID)); +constexpr bool is_batchable(api::MessageType::Id id) noexcept { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID); +} + +constexpr bool has_bucket_info(api::MessageType::Id id) noexcept { + return (is_batchable(id) || + (id == api::MessageType::REMOVELOCATION_ID || + id == api::MessageType::JOINBUCKETS_ID)); +} + +constexpr vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; + +} // anon ns + +DeferredReplySenderStub::DeferredReplySenderStub() = default; +DeferredReplySenderStub::~DeferredReplySenderStub() = default; + +AsyncMessageBatch::AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock, + const PersistenceUtil& env, + MessageSender& reply_sender) noexcept + : _bucket_lock(std::move(bucket_lock)), + _env(env), + _reply_sender(reply_sender), + _deferred_sender_stub() +{ + assert(_bucket_lock); +} + +AsyncMessageBatch::~AsyncMessageBatch() { + const auto bucket_info = _env.getBucketInfo(_bucket_lock->getBucket()); + _env.updateBucketDatabase(_bucket_lock->getBucket(), bucket_info); + + std::lock_guard lock(_deferred_sender_stub._mutex); // Ensure visibility of posted replies + for (auto& reply : _deferred_sender_stub._deferred_replies) { + if (reply->getResult().success()) { + dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(bucket_info); + } + _reply_sender.sendReplyDirectly(reply); } - const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; + LOG(debug, "Processed async feed message batch of %zu ops for %s. New bucket info is %s", + _deferred_sender_stub._deferred_replies.size(), + _bucket_lock->getBucket().toString().c_str(), bucket_info.toString().c_str()); } MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg, + std::shared_ptr<api::StorageMessage> msg, ThrottleToken throttle_token) - : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) + : MessageTracker(timer, env, replySender, true, std::move(bucketLock), {}, std::move(msg), std::move(throttle_token)) +{} + +MessageTracker::MessageTracker(const framework::MilliSecTimer& timer, + const PersistenceUtil& env, + std::shared_ptr<AsyncMessageBatch> batch, + std::shared_ptr<api::StorageMessage> msg, + ThrottleToken throttle_token) + : MessageTracker(timer, env, batch->deferred_sender_stub(), false, {}, std::move(batch), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg, + bool update_bucket_info, + std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock, + std::shared_ptr<AsyncMessageBatch> part_of_batch, + std::shared_ptr<api::StorageMessage> msg, ThrottleToken throttle_token) : _sendReply(true), - _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), - _bucketLock(std::move(bucketLock)), + _updateBucketInfo(update_bucket_info && has_bucket_info(msg->getType().getId())), + _bucketLock(std::move(bucket_lock)), + _part_of_batch(std::move(part_of_batch)), _msg(std::move(msg)), _throttle_token(std::move(throttle_token)), _context(_msg->getPriority(), _msg->getTrace().getLevel()), @@ -61,7 +102,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), - std::move(msg), ThrottleToken())); + {}, std::move(msg), ThrottleToken())); } void @@ -102,6 +143,7 @@ MessageTracker::sendReply() { if (hasReply()) { getReply().getTrace().addChild(_context.steal_trace()); if (_updateBucketInfo) { + assert(_bucketLock); if (getReply().getResult().success()) { _env.setBucketInfo(*this, _bucketLock->getBucket()); } @@ -163,7 +205,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd) std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> MessageTracker::sync_phase_done_notifier_or_nullptr() const { - if (_bucketLock->wants_sync_phase_done_notification()) { + if (_bucketLock && _bucketLock->wants_sync_phase_done_notification()) { return _bucketLock; } return {}; @@ -236,7 +278,7 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket & { api::BucketInfo info = getBucketInfo(bucket); - static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info); + dynamic_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info); updateBucketDatabase(bucket, info); } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 900f301252e..71e5c2754e9 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -25,12 +25,50 @@ namespace storage { class PersistenceUtil; +struct DeferredReplySenderStub : MessageSender { + std::mutex _mutex; + std::vector<std::shared_ptr<api::StorageReply>> _deferred_replies; + + DeferredReplySenderStub(); + ~DeferredReplySenderStub() override; + + void sendCommand(const std::shared_ptr<api::StorageCommand>&) override { + abort(); // Not supported + } + void sendReply(const std::shared_ptr<api::StorageReply>& reply) override { + std::lock_guard lock(_mutex); + _deferred_replies.emplace_back(reply); + } +}; + +class AsyncMessageBatch { + std::shared_ptr<FileStorHandler::BucketLockInterface> _bucket_lock; + const PersistenceUtil& _env; + MessageSender& _reply_sender; + DeferredReplySenderStub _deferred_sender_stub; +public: + AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock, + const PersistenceUtil& env, + MessageSender& reply_sender) noexcept; + // Triggered by last referencing batched MessageTracker being destroyed. + // Fetches bucket info, updates DB and sends all deferred replies with the new bucket info. + ~AsyncMessageBatch(); + + [[nodiscard]] MessageSender& deferred_sender_stub() noexcept { return _deferred_sender_stub; } +}; + class MessageTracker { public: using UP = std::unique_ptr<MessageTracker>; - MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & reply_sender, + FileStorHandler::BucketLockInterface::SP bucket_lock, std::shared_ptr<api::StorageMessage> msg, + ThrottleToken throttle_token); + + // For use with batching where bucket lock is held separately and bucket info + // is _not_ fetched or updated per message. + MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, + std::shared_ptr<AsyncMessageBatch> batch, std::shared_ptr<api::StorageMessage> msg, ThrottleToken throttle_token); ~MessageTracker(); @@ -58,48 +96,53 @@ public: * commands like merge. */ void dontReply() { _sendReply = false; } - bool hasReply() const { return bool(_reply); } - const api::StorageReply & getReply() const { + [[nodiscard]] bool hasReply() const { return bool(_reply); } + [[nodiscard]] const api::StorageReply & getReply() const { return *_reply; } - api::StorageReply & getReply() { + [[nodiscard]] api::StorageReply & getReply() { return *_reply; } - std::shared_ptr<api::StorageReply> && stealReplySP() && { + [[nodiscard]] std::shared_ptr<api::StorageReply> && stealReplySP() && { return std::move(_reply); } void generateReply(api::StorageCommand& cmd); - api::ReturnCode getResult() const { return _result; } + [[nodiscard]] api::ReturnCode getResult() const { return _result; } - spi::Context & context() { return _context; } - document::BucketId getBucketId() const { + [[nodiscard]] spi::Context & context() { return _context; } + [[nodiscard]] document::BucketId getBucketId() const { return _bucketLock->getBucket().getBucketId(); } void sendReply(); - bool checkForError(const spi::Result& response); + [[nodiscard]] bool checkForError(const spi::Result& response); // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified - // when the sync phase is complete. Otherwise returns a nullptr shared_ptr. - std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const; + // when the sync phase is complete. Otherwise, returns a nullptr shared_ptr. + [[nodiscard]] std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const; static MessageTracker::UP createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); private: - MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, + MessageSender& reply_sender, bool update_bucket_info, + std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock, + std::shared_ptr<AsyncMessageBatch> part_of_batch, + std::shared_ptr<api::StorageMessage> msg, ThrottleToken throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; bool _sendReply; bool _updateBucketInfo; + // Either _bucketLock or _part_of_batch must be set, never both at the same time FileStorHandler::BucketLockInterface::SP _bucketLock; + std::shared_ptr<AsyncMessageBatch> _part_of_batch; // nullptr if not batched std::shared_ptr<api::StorageMessage> _msg; ThrottleToken _throttle_token; spi::Context _context; @@ -117,8 +160,6 @@ public: struct LockResult { std::shared_ptr<FileStorHandler::BucketLockInterface> lock; LockResult() : lock() {} - - bool bucketExisted() const { return bool(lock); } }; PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler, diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h index 277e7b4fdfd..93b305370e0 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h @@ -13,7 +13,7 @@ namespace storage::framework::defaultimplementation { -struct RealClock : public Clock { +struct RealClock final : public Clock { vespalib::steady_time getMonotonicTime() const override; vespalib::system_time getSystemTime() const override; }; |