From a8dd709dceca4c53096be285f35686439a7902eb Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 5 Apr 2024 14:59:51 +0000 Subject: Support pipelining (batching) of mutating ops to same bucket Bucket operations require either exclusive (single writer) or shared (multiple readers) access. Prior to this commit, this means that many enqueued feed operations to the same bucket introduce pipeline stalls due to each operation having to wait for all prior operations to the bucket to complete entirely (including fsync of WAL append). This is a likely scenario when feeding a document set that was previously acquired through visiting, as such documents will inherently be output in bucket-order. With this commit, a configurable number of feed operations (put, remove and update) bound for the exact same bucket may be sent asynchronously to the persistence provider in the context of the _same_ write lock. This mirrors how merge operations work for puts and removes. Batching is fairly conservative, and will _not_ batch across further messages when any of the following holds: * A non-feed operation is encountered * More than one mutating operation is encountered for the same document ID * No more persistence throttler tokens can be acquired * Max batch size has been reached Updating the bucket DB, assigning bucket info and sending replies is deferred until _all_ batched operations complete. Max batch size is (re-)configurable live and defaults to a batch size of 1, which shall have the exact same semantics as the legacy behavior. Additionally, clock sampling for persistence threads have been abstracted away to allow for mocking in tests (no need for sleep!). --- configdefinitions/src/vespa/stor-filestor.def | 9 +- metrics/src/vespa/metrics/metrictimer.cpp | 9 +- metrics/src/vespa/metrics/metrictimer.h | 24 +- .../tests/persistence/filestorage/CMakeLists.txt | 1 + .../filestorage/feed_operation_batching_test.cpp | 318 +++++++++++++++++++++ .../filestorage/filestormanagertest.cpp | 35 ++- .../src/vespa/storage/persistence/asynchandler.cpp | 6 +- .../vespa/storage/persistence/batched_message.h | 14 + .../persistence/filestorage/filestorhandler.cpp | 11 + .../persistence/filestorage/filestorhandler.h | 65 +++-- .../filestorage/filestorhandlerimpl.cpp | 126 +++++++- .../persistence/filestorage/filestorhandlerimpl.h | 59 ++-- .../persistence/filestorage/filestormanager.cpp | 1 + .../storage/persistence/persistencehandler.cpp | 24 +- .../vespa/storage/persistence/persistencehandler.h | 10 +- .../storage/persistence/persistencethread.cpp | 16 +- .../vespa/storage/persistence/persistencethread.h | 10 +- .../vespa/storage/persistence/persistenceutil.cpp | 86 ++++-- .../vespa/storage/persistence/persistenceutil.h | 73 +++-- .../defaultimplementation/clock/realclock.h | 2 +- 20 files changed, 767 insertions(+), 132 deletions(-) create mode 100644 storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp create mode 100644 storage/src/vespa/storage/persistence/batched_message.h diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index de67d4336e9..a5d86cc91ba 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -29,7 +29,7 @@ response_sequencer_type enum {LATENCY, THROUGHPUT, ADAPTIVE} default=ADAPTIVE re ## Should follow stor-distributormanager:splitsize (16MB). bucket_merge_chunk_size int default=16772216 restart -## Whether or not to use async message handling when scheduling storage messages from FileStorManager. +## Whether to use async message handling when scheduling storage messages from FileStorManager. ## ## When turned on, the calling thread (e.g. FNET network thread when using Storage API RPC) ## gets the next async message to handle (if any) as part of scheduling a storage message. @@ -61,3 +61,10 @@ async_operation_throttler.window_size_backoff double default=0.95 async_operation_throttler.min_window_size int default=20 async_operation_throttler.max_window_size int default=-1 # < 0 implies INT_MAX async_operation_throttler.resize_rate double default=3.0 + +## Maximum number of enqueued put/remove/update operations towards a given bucket +## that can be dispatched asynchronously as a batch under the same write lock. +## This prevents pipeline stalls when many write operations are in-flight to the +## same bucket, as each operation would otherwise have to wait for the completion +## of all prior writes to the bucket. +max_feed_op_batch_size int default=1 diff --git a/metrics/src/vespa/metrics/metrictimer.cpp b/metrics/src/vespa/metrics/metrictimer.cpp index 84d4844104d..a3b0f215d58 100644 --- a/metrics/src/vespa/metrics/metrictimer.cpp +++ b/metrics/src/vespa/metrics/metrictimer.cpp @@ -3,13 +3,18 @@ namespace metrics { -MetricTimer::MetricTimer() +MetricTimer::MetricTimer() noexcept + : _startTime(std::chrono::steady_clock::now()) { // Amusingly enough, steady_clock was not actually steady by default on // GCC < 4.8.1, so add a bit of compile-time paranoia just to make sure. static_assert(std::chrono::steady_clock::is_steady, "Old/broken STL implementation; steady_clock not steady"); - _startTime = std::chrono::steady_clock::now(); +} + +MetricTimer::MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept + : _startTime(start_time) +{ } } // metrics diff --git a/metrics/src/vespa/metrics/metrictimer.h b/metrics/src/vespa/metrics/metrictimer.h index 8a338432362..133cd819489 100644 --- a/metrics/src/vespa/metrics/metrictimer.h +++ b/metrics/src/vespa/metrics/metrictimer.h @@ -15,7 +15,19 @@ namespace metrics { class MetricTimer { public: - MetricTimer(); + // Start time point set by system steady clock + MetricTimer() noexcept; + // Start time point explicitly given + explicit MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept; + + template + AvgVal stop(std::chrono::steady_clock::time_point now, ValueMetric& metric) const { + const auto delta = now - _startTime; + using ToDuration = std::chrono::duration; + const auto deltaMs(std::chrono::duration_cast(delta).count()); + metric.addValue(deltaMs); + return deltaMs; + } /** * Adds ms passed since this timer was constructed to given value metric. @@ -26,11 +38,11 @@ public: */ template AvgVal stop(ValueMetric& metric) const { - const auto delta = std::chrono::steady_clock::now() - _startTime; - using ToDuration = std::chrono::duration; - const auto deltaMs(std::chrono::duration_cast(delta).count()); - metric.addValue(deltaMs); - return deltaMs; + return stop(std::chrono::steady_clock::now(), metric); + } + + [[nodiscard]] std::chrono::steady_clock::time_point start_time() const noexcept { + return _startTime; } private: diff --git a/storage/src/tests/persistence/filestorage/CMakeLists.txt b/storage/src/tests/persistence/filestorage/CMakeLists.txt index f1a8a286bbd..951a361474e 100644 --- a/storage/src/tests/persistence/filestorage/CMakeLists.txt +++ b/storage/src/tests/persistence/filestorage/CMakeLists.txt @@ -4,6 +4,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST SOURCES deactivatebucketstest.cpp deletebuckettest.cpp + feed_operation_batching_test.cpp filestormanagertest.cpp filestormodifiedbucketstest.cpp mergeblockingtest.cpp diff --git a/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp new file mode 100644 index 00000000000..cf16123933b --- /dev/null +++ b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp @@ -0,0 +1,318 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using document::test::makeDocumentBucket; +using document::BucketId; +using document::DocumentId; +using namespace ::testing; + +namespace storage { + +struct FeedOperationBatchingTest : FileStorTestFixture { + DummyStorageLink _top; + std::unique_ptr _message_sender; + FileStorMetrics _metrics; + std::unique_ptr _handler; + api::Timestamp _next_timestamp; + + FeedOperationBatchingTest(); + ~FeedOperationBatchingTest() override; + + void SetUp() override { + FileStorTestFixture::SetUp(); + // This silly little indirection is a work-around for the top-level link needing something + // below it to send _up_ into it, rather than directly receiving the messages itself. + auto message_receiver = std::make_unique(); + _message_sender = std::make_unique(*message_receiver); + _top.push_back(std::move(message_receiver)); + _top.open(); + _metrics.initDiskMetrics(1, 1); + // By default, sets up 1 thread with 1 stripe + _handler = std::make_unique(*_message_sender, _metrics, _node->getComponentRegister()); + _handler->set_max_feed_op_batch_size(3); + } + + void TearDown() override { + _handler.reset(); + FileStorTestFixture::TearDown(); + } + + [[nodiscard]] static vespalib::string id_str_of(uint32_t bucket_idx, uint32_t doc_idx) { + return vespalib::make_string("id:foo:testdoctype1:n=%u:%u", bucket_idx, doc_idx); + } + + [[nodiscard]] static DocumentId id_of(uint32_t bucket_idx, uint32_t doc_idx) { + return DocumentId(id_str_of(bucket_idx, doc_idx)); + } + + void schedule_msg(const std::shared_ptr& msg) { + msg->setAddress(makeSelfAddress()); + _handler->schedule(msg); // takes shared_ptr by const ref, no point in moving + } + + void send_put(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp, vespalib::duration timeout) { + auto id = id_str_of(bucket_idx, doc_idx); + auto doc = _node->getTestDocMan().createDocument("foobar", id); + auto cmd = std::make_shared(makeDocumentBucket({16, bucket_idx}), std::move(doc), timestamp); + cmd->setTimeout(timeout); + schedule_msg(cmd); + } + + void send_put(uint32_t bucket_idx, uint32_t doc_idx) { + send_put(bucket_idx, doc_idx, next_timestamp(), 60s); + } + + void send_puts(std::initializer_list> bucket_docs) { + for (const auto& bd : bucket_docs) { + send_put(bd.first, bd.second); + } + } + + void send_get(uint32_t bucket_idx, uint32_t doc_idx) { + auto id = id_of(bucket_idx, doc_idx); + auto cmd = std::make_shared(makeDocumentBucket({16, bucket_idx}), id, document::AllFields::NAME); + schedule_msg(cmd); + } + + void send_remove(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) { + auto id = id_of(bucket_idx, doc_idx); + auto cmd = std::make_shared(makeDocumentBucket({16, bucket_idx}), id, timestamp); + schedule_msg(cmd); + } + + void send_remove(uint32_t bucket_idx, uint32_t doc_idx) { + send_remove(bucket_idx, doc_idx, next_timestamp()); + } + + void send_update(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) { + auto id = id_of(bucket_idx, doc_idx); + auto update = std::make_shared( + _node->getTestDocMan().getTypeRepo(), + _node->getTestDocMan().createRandomDocument()->getType(), id); + auto cmd = std::make_shared(makeDocumentBucket({16, bucket_idx}), std::move(update), timestamp); + schedule_msg(cmd); + } + + void send_update(uint32_t bucket_idx, uint32_t doc_idx) { + send_update(bucket_idx, doc_idx, next_timestamp()); + } + + [[nodiscard]] api::Timestamp next_timestamp() { + auto ret = _next_timestamp; + ++_next_timestamp; + return ret; + } + + [[nodiscard]] vespalib::steady_time fake_now() const { + return _node->getClock().getMonotonicTime(); + } + + [[nodiscard]] vespalib::steady_time fake_deadline() const { + return _node->getClock().getMonotonicTime() + 60s; + } + + [[nodiscard]] FileStorHandler::LockedMessageBatch next_batch() { + return _handler->next_message_batch(0, fake_now(), fake_deadline()); + } + + template + static void assert_batch_msg_is(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + ASSERT_LT(msg_idx, batch.size()); + auto msg = batch.messages[msg_idx].first; + auto* as_cmd = dynamic_cast(msg.get()); + ASSERT_TRUE(as_cmd) << msg->toString() << " does not have the expected type"; + EXPECT_EQ(as_cmd->getBucketId(), BucketId(16, expected_bucket_idx)); + + auto id = as_cmd->getDocumentId(); + ASSERT_TRUE(id.getScheme().hasNumber()); + EXPECT_EQ(id.getScheme().getNumber(), expected_bucket_idx) << id; + std::string actual_id_part = id.getScheme().getNamespaceSpecific(); + std::string expected_id_part = std::to_string(expected_doc_idx); + EXPECT_EQ(actual_id_part, expected_id_part) << id; + } + + static void assert_batch_msg_is_put(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_remove(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_update(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + static void assert_batch_msg_is_get(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx, + uint32_t expected_bucket_idx, uint32_t expected_doc_idx) + { + assert_batch_msg_is(batch, msg_idx, expected_bucket_idx, expected_doc_idx); + } + + enum Type { + Put, + Update, + Remove, + Get + }; + + static void assert_empty_batch(const FileStorHandler::LockedMessageBatch& batch) { + ASSERT_TRUE(batch.empty()); + ASSERT_FALSE(batch.lock); + } + + static void assert_batch(const FileStorHandler::LockedMessageBatch& batch, + uint32_t expected_bucket_idx, + std::initializer_list> expected_msgs) + { + ASSERT_TRUE(batch.lock); + ASSERT_EQ(batch.lock->getBucket().getBucketId(), BucketId(16, expected_bucket_idx)); + ASSERT_EQ(batch.size(), expected_msgs.size()); + + uint32_t idx = 0; + for (const auto& msg : expected_msgs) { + switch (msg.first) { + case Type::Put: assert_batch_msg_is_put(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Update: assert_batch_msg_is_update(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Remove: assert_batch_msg_is_remove(batch, idx, expected_bucket_idx, msg.second); break; + case Type::Get: assert_batch_msg_is_get(batch, idx, expected_bucket_idx, msg.second); break; + default: FAIL(); + } + ++idx; + } + } +}; + +FeedOperationBatchingTest::FeedOperationBatchingTest() + : FileStorTestFixture(), + _top(), + _message_sender(), + _metrics(), + _handler(), + _next_timestamp(1000) +{ +} + +FeedOperationBatchingTest::~FeedOperationBatchingTest() = default; + +// Note: unless explicitly set by the testcase, max batch size is 3 + +TEST_F(FeedOperationBatchingTest, batching_is_disabled_with_1_max_batch_size) { + _handler->set_max_feed_op_batch_size(1); + send_puts({{1, 1}, {1, 2}, {2, 3}, {2, 4}}); + // No batching; has the same behavior as current FIFO + assert_batch(next_batch(), 1, {{Put, 1}}); + assert_batch(next_batch(), 1, {{Put, 2}}); + assert_batch(next_batch(), 2, {{Put, 3}}); + assert_batch(next_batch(), 2, {{Put, 4}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_is_limited_to_configured_max_size) { + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_batch(next_batch(), 1, {{Put, 4}, {Put, 5}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_can_consume_entire_queue) { + send_puts({{1, 1}, {1, 2}, {1, 3}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_empty_batch(next_batch()); +} + +TEST_F(FeedOperationBatchingTest, batching_is_only_done_for_single_bucket) { + send_puts({{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}}); + assert_batch(next_batch(), 1, {{Put, 1}}); + assert_batch(next_batch(), 2, {{Put, 2}, {Put, 3}, {Put, 4}}); + assert_batch(next_batch(), 3, {{Put, 5}}); +} + +TEST_F(FeedOperationBatchingTest, batch_can_include_all_supported_feed_op_types) { + send_put(1, 1); + send_remove(1, 2); + send_update(1, 3); + assert_batch(next_batch(), 1, {{Put, 1}, {Remove, 2}, {Update, 3}}); +} + +TEST_F(FeedOperationBatchingTest, timed_out_reqeusts_are_ignored_by_batch) { + send_puts({{1, 1}}); + send_put(1, 2, next_timestamp(), 1s); + send_puts({{1, 3}}); + _node->getClock().addSecondsToTime(2); + // Put #2 with 1s timeout has expired in the queue and should not be returned as part of the batch + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 3}}); + ASSERT_EQ(_top.getNumReplies(), 0); + // The actual timeout is handled by the next message fetch invocation + assert_empty_batch(next_batch()); + ASSERT_EQ(_top.getNumReplies(), 1); + EXPECT_EQ(dynamic_cast(*_top.getReply(0)).getResult().getResult(), api::ReturnCode::TIMEOUT); +} + +TEST_F(FeedOperationBatchingTest, non_feed_ops_are_not_batched) { + send_get(1, 2); + send_get(1, 3); + assert_batch(next_batch(), 1, {{Get, 2}}); + assert_batch(next_batch(), 1, {{Get, 3}}); +} + +TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_non_feed_op) { + // It can reasonably be argued that we could batch _around_ a Get operation and still + // have correct behavior, but the Get here is just a stand-in for an arbitrary operation such + // as a Split (which changes the bucket set), which is rather more tricky to reason about. + // For simplicity and understandability, just stall the batch pipeline (at least for now). + send_get(1, 2); + send_puts({{1, 3}, {1, 4}}); + send_get(1, 5); + send_puts({{1, 6}, {1, 7}}); + + assert_batch(next_batch(), 1, {{Get, 2}}); // If first op is non-feed, only it should be returned + assert_batch(next_batch(), 1, {{Put, 3}, {Put, 4}}); + assert_batch(next_batch(), 1, {{Get, 5}}); + assert_batch(next_batch(), 1, {{Put, 6}, {Put, 7}}); +} + +TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_concurrent_ops_to_same_document) { + // 2 ops to doc #2. Since this is expected to be a very rare edge case, just + // stop batching at that point and defer the concurrent op to the next batch. + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 2}, {1, 4}}); + assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + assert_batch(next_batch(), 1, {{Put, 2}, {Put, 4}}); +} + +TEST_F(FeedOperationBatchingTest, batch_respects_persistence_throttling) { + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.min_window_size = 3; + params.max_window_size = 3; + params.window_size_increment = 1; + _handler->use_dynamic_operation_throttling(true); + _handler->reconfigure_dynamic_throttler(params); + _handler->set_max_feed_op_batch_size(10); // > win size to make sure we test the right thing + + send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}}); + auto batch = next_batch(); // holds 3 throttle tokens + assert_batch(batch, 1, {{Put, 1}, {Put, 2}, {Put, 3}}); + // No more throttle tokens available + assert_empty_batch(next_batch()); +} + +} // storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 1ccd51d3f06..56149bbc14d 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -43,6 +43,7 @@ LOG_SETUP(".filestormanagertest"); using std::unique_ptr; using document::Document; +using document::BucketId; using namespace storage::api; using storage::spi::test::makeSpiBucket; using document::test::makeDocumentBucket; @@ -405,6 +406,38 @@ TEST_F(FileStorManagerTest, put) { } } +TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_info) { + TestFileStorComponents c(*this); + c.manager->getFileStorHandler().set_max_feed_op_batch_size(10); + BucketId bucket_id(16, 1); + createBucket(bucket_id); + constexpr uint32_t n = 10; + { + // Barrier to prevent any messages from being processed until we've enqueued all puts + auto guard = c.manager->getFileStorHandler().lock(makeDocumentBucket(bucket_id), LockingRequirements::Exclusive); + for (uint32_t i = 0; i < n; ++i) { + auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i); + put->setAddress(_storage3); + c.top.sendDown(put); + } + } + // All 10 puts shall now be visible and waiting for the persistence thread to fetch as a single batch. + c.top.waitForMessages(n, _waitTime); + api::BucketInfo expected_bucket_info; + { + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket_id, "foo")); + ASSERT_TRUE(entry.exists()); + EXPECT_EQ(entry->getBucketInfo().getDocumentCount(), n); + expected_bucket_info = entry->getBucketInfo(); + } + // All replies should have the _same_ bucket info due to being processed in the same batch. + auto replies = c.top.getRepliesOnce(); + for (auto& reply : replies) { + auto actual_bucket_info = dynamic_cast(*reply).getBucketInfo(); + EXPECT_EQ(actual_bucket_info, expected_bucket_info); + } +} + TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { TestFileStorComponents c(*this); @@ -726,7 +759,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { filestorHandler.schedule(cmd); } - std::this_thread::sleep_for(51ms); + _node->getClock().addMilliSecondsToTime(51); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); if (lock.lock.get()) { diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index 59e0853cc21..ab176ebb9cb 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -143,7 +143,7 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider MessageTracker::UP AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const { auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->sendReply(); }); spi::Bucket bucket(cmd.getBucket()); @@ -169,7 +169,7 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket()); auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->sendReply(); }); _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), @@ -517,7 +517,7 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack } auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) { - tracker->checkForError(*response); + (void)tracker->checkForError(*response); tracker->setReply(std::make_shared(cmd, removed)); tracker->sendReply(); }); diff --git a/storage/src/vespa/storage/persistence/batched_message.h b/storage/src/vespa/storage/persistence/batched_message.h new file mode 100644 index 00000000000..c23383edfde --- /dev/null +++ b/storage/src/vespa/storage/persistence/batched_message.h @@ -0,0 +1,14 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.// +#pragma once + +#include "shared_operation_throttler.h" +#include +#include + +namespace storage { + +namespace api { class StorageMessage; } + +using BatchedMessage = std::pair, ThrottleToken>; + +} diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index a89b705de1b..274d59899d9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -5,4 +5,15 @@ namespace storage { FileStorHandler::LockedMessage::~LockedMessage() = default; +FileStorHandler::LockedMessageBatch::LockedMessageBatch(LockedMessage&& initial_msg) + : lock(std::move(initial_msg.lock)), + messages() +{ + if (lock) { + messages.emplace_back(std::move(initial_msg.msg), std::move(initial_msg.throttle_token)); + } +} + +FileStorHandler::LockedMessageBatch::~LockedMessageBatch() = default; + } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 68b8411c762..6a8b74baf1d 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -1,23 +1,13 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -/** - * \class storage::FileStorHandler - * \ingroup storage - * - * \brief Common resource for filestor threads - * - * Takes care of the interface between file stor threads and the file stor - * manager to avoid circular dependencies, and confine the implementation that - * needs to worry about locking between these components. - */ - #pragma once #include #include #include +#include #include #include +#include namespace storage { namespace api { @@ -80,13 +70,7 @@ public: std::shared_ptr msg; ThrottleToken throttle_token; - LockedMessage() noexcept = default; - LockedMessage(std::shared_ptr lock_, - std::shared_ptr msg_) noexcept - : lock(std::move(lock_)), - msg(std::move(msg_)), - throttle_token() - {} + constexpr LockedMessage() noexcept = default; LockedMessage(std::shared_ptr lock_, std::shared_ptr msg_, ThrottleToken token) noexcept @@ -98,27 +82,40 @@ public: ~LockedMessage(); }; + struct LockedMessageBatch { + std::shared_ptr lock; + vespalib::SmallVector messages; + + LockedMessageBatch() = default; + explicit LockedMessageBatch(LockedMessage&& initial_msg); + LockedMessageBatch(LockedMessageBatch&&) noexcept = default; + ~LockedMessageBatch(); + + [[nodiscard]] bool empty() const noexcept { return messages.empty(); } + [[nodiscard]] size_t size() const noexcept { return messages.size(); } + // Precondition: messages.size() == 1 + [[nodiscard]] LockedMessage release_as_single_msg() noexcept; + }; + class ScheduleAsyncResult { - private: - bool _was_scheduled; + bool _was_scheduled; LockedMessage _async_message; - public: - ScheduleAsyncResult() : _was_scheduled(false), _async_message() {} - explicit ScheduleAsyncResult(LockedMessage&& async_message_in) + constexpr ScheduleAsyncResult() noexcept : _was_scheduled(false), _async_message() {} + explicit ScheduleAsyncResult(LockedMessage&& async_message_in) noexcept : _was_scheduled(true), _async_message(std::move(async_message_in)) {} - bool was_scheduled() const { + [[nodiscard]] bool was_scheduled() const noexcept { return _was_scheduled; } - bool has_async_message() const { - return _async_message.lock.get() != nullptr; + [[nodiscard]] bool has_async_message() const noexcept { + return static_cast(_async_message.lock); } - const LockedMessage& async_message() const { + [[nodiscard]] const LockedMessage& async_message() const noexcept { return _async_message; } - LockedMessage&& release_async_message() { + [[nodiscard]] LockedMessage&& release_async_message() noexcept { return std::move(_async_message); } }; @@ -129,7 +126,7 @@ public: }; FileStorHandler() : _getNextMessageTimout(100ms) { } - virtual ~FileStorHandler() = default; + ~FileStorHandler() override = default; /** @@ -171,7 +168,9 @@ public: * * @param stripe The stripe to get messages for */ - virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; + [[nodiscard]] virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; + + [[nodiscard]] virtual LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) = 0; /** Only used for testing, should be removed */ LockedMessage getNextMessage(uint32_t stripeId) { @@ -189,8 +188,6 @@ public: * NB: As current operation can be a split or join operation, make sure that * you always wait for current to finish, if is a super or sub bucket of * the bucket we're locking. - * - * */ virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0; @@ -287,6 +284,8 @@ public: virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0; virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; + + virtual void set_max_feed_op_batch_size(uint32_t max_batch) noexcept = 0; private: vespalib::duration _getNextMessageTimout; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 1984d44652a..2e2634025a7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -60,7 +61,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false), _throttle_apply_bucket_diff_ops(false), - _last_active_operations_stats() + _last_active_operations_stats(), + _max_feed_op_batch_size(1) { assert(numStripes > 0); _stripes.reserve(numStripes); @@ -241,8 +243,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr& msg) { if (getState() == FileStorHandler::AVAILABLE) { document::Bucket bucket = getStorageMessageBucket(*msg); - stripe(bucket).schedule(MessageEntry(msg, bucket)); - return true; + return stripe(bucket).schedule(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime())); } return false; } @@ -252,7 +253,7 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) { std::unique_lock guard(*_lock); @@ -858,9 +874,10 @@ FileStorHandlerImpl::sendReplyDirectly(const std::shared_ptr& } FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr& cmd, - const document::Bucket &bucket) + const document::Bucket& bucket, + vespalib::steady_time scheduled_at_time) : _command(cmd), - _timer(), + _timer(scheduled_at_time), _bucket(bucket), _priority(cmd->getPriority()) { } @@ -939,9 +956,8 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) +FileStorHandlerImpl::Stripe::next_message_impl(monitor_guard& guard, vespalib::steady_time deadline) { - std::unique_lock guard(*_lock); ThrottleToken throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the @@ -992,6 +1008,92 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) return {}; // No message fetched. } +FileStorHandler::LockedMessage +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) +{ + std::unique_lock guard(*_lock); + return next_message_impl(guard, deadline); +} + +namespace { + +constexpr bool is_batchable_feed_op(api::MessageType::Id id) noexcept { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID); +} + +// Precondition: msg must be a feed operation request (put, remove, update) +document::GlobalId gid_from_feed_op(const api::StorageMessage& msg) { + switch (msg.getType().getId()) { + case api::MessageType::PUT_ID: + return dynamic_cast(msg).getDocumentId().getGlobalId(); + case api::MessageType::REMOVE_ID: + return dynamic_cast(msg).getDocumentId().getGlobalId(); + case api::MessageType::UPDATE_ID: + return dynamic_cast(msg).getDocumentId().getGlobalId(); + default: abort(); + } +} + +} // anon ns + +FileStorHandler::LockedMessageBatch +FileStorHandlerImpl::Stripe::next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline) +{ + const auto max_batch_size = _owner.max_feed_op_batch_size(); + + std::unique_lock guard(*_lock); + auto initial_locked = next_message_impl(guard, deadline); + if (!initial_locked.lock || !is_batchable_feed_op(initial_locked.msg->getType().getId()) || (max_batch_size == 1)) { + return LockedMessageBatch(std::move(initial_locked)); + } + LockedMessageBatch batch(std::move(initial_locked)); + fill_feed_op_batch(guard, batch, max_batch_size, now); + return batch; +} + +void +FileStorHandlerImpl::Stripe::fill_feed_op_batch(monitor_guard& guard, LockedMessageBatch& batch, + uint32_t max_batch_size, vespalib::steady_time now) +{ + assert(batch.size() == 1); + BucketIdx& idx = bmi::get<2>(*_queue); + auto bucket_msgs = idx.equal_range(batch.lock->getBucket()); + // Process in FIFO order (_not_ priority order) until we hit the end, a non-batchable operation + // (implicit pipeline stall since bucket set might change) or can't get another throttle token. + // We also stall the pipeline if we get a concurrent modification to the same document (not expected, + // as the distributors should prevent this, but _technically_ it is possible). + const auto expected_max_size = std::min(ssize_t(max_batch_size), std::distance(bucket_msgs.first, bucket_msgs.second) + 1); + vespalib::hash_set gids_in_batch(expected_max_size); + gids_in_batch.insert(gid_from_feed_op(*batch.messages[0].first)); + for (auto it = bucket_msgs.first; (it != bucket_msgs.second) && (batch.messages.size() < max_batch_size);) { + if (!is_batchable_feed_op(it->_command->getType().getId())) { + break; + } + auto [existing_iter, inserted] = gids_in_batch.insert(gid_from_feed_op(*it->_command)); + if (!inserted) { + break; // Already present in batch + } + if (messageTimedOutInQueue(*it->_command, now - it->_timer.start_time())) { + // We just ignore timed out ops here; actually generating a timeout reply will be done by + // next_message_impl() during a subsequent invocation. This avoids having to deal with any + // potential issues caused by sending a reply up while holding the queue lock, since we + // can't release it here. + ++it; + continue; + } + auto throttle_token = _owner.operation_throttler().try_acquire_one(); + if (!throttle_token.valid()) { + break; + } + // Note: iterator is const; can't std::move(it->_command) + batch.messages.emplace_back(it->_command, std::move(throttle_token)); + it = idx.erase(it); + } + update_cached_queue_size(guard); +} + FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) { @@ -1021,9 +1123,11 @@ FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, ThrottleToken throttle_token) { - std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); + std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop( + _owner._component.getClock().getMonotonicTime(), + _metrics->averageQueueWaitingTime))); - std::shared_ptr msg = std::move(iter->_command); + std::shared_ptr msg = iter->_command; // iter is const; can't std::move() document::Bucket bucket(iter->_bucket); idx.erase(iter); // iter not used after this point. update_cached_queue_size(guard); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index f7c9b218779..ac8e5c52cbf 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -55,7 +55,9 @@ public: document::Bucket _bucket; uint8_t _priority; - MessageEntry(const std::shared_ptr& cmd, const document::Bucket &bId); + MessageEntry(const std::shared_ptr& cmd, + const document::Bucket& bucket, + vespalib::steady_time scheduled_at_time); MessageEntry(MessageEntry &&) noexcept ; MessageEntry(const MessageEntry &) noexcept; MessageEntry & operator = (const MessageEntry &) = delete; @@ -66,13 +68,13 @@ public: } }; - using PriorityOrder = bmi::ordered_non_unique >; - using BucketOrder = bmi::ordered_non_unique>; - + // ordered_non_unique shall preserve insertion order as iteration order of equal keys, but this is rather magical... + using PriorityOrder = bmi::ordered_non_unique>; + using BucketOrder = bmi::ordered_non_unique>; using PriorityQueue = bmi::multi_index_container, PriorityOrder, BucketOrder>>; + using PriorityIdx = bmi::nth_index::type; + using BucketIdx = bmi::nth_index::type; - using PriorityIdx = bmi::nth_index::type; - using BucketIdx = bmi::nth_index::type; using Clock = std::chrono::steady_clock; using monitor_guard = std::unique_lock; using atomic_size_t = vespalib::datastore::AtomicValueWrapper; @@ -114,7 +116,7 @@ public: ActiveOperationsStats &stats() { return _stats; } }; SafeActiveOperationsStats() : _lock(std::make_unique()), _stats() {} - Guard guard() { return Guard(*_lock, _stats, ctor_tag()); } + [[nodiscard]] Guard guard() { return Guard(*_lock, _stats, ctor_tag()); } }; Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender); @@ -123,8 +125,8 @@ public: Stripe & operator =(const Stripe &) = delete; ~Stripe(); void flush(); - bool schedule(MessageEntry messageEntry); - FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry); + [[nodiscard]] bool schedule(MessageEntry messageEntry); + [[nodiscard]] FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry); void waitUntilNoLocks() const; void abort(std::vector> & aborted, const AbortBucketOperationsCommand& cmd); void waitInactive(const AbortBucketOperationsCommand& cmd) const; @@ -151,18 +153,19 @@ public: api::LockingRequirements lockReq, bool count_as_active_merge, const LockEntry & lockEntry); - std::shared_ptr lock(const document::Bucket & bucket, api::LockingRequirements lockReq); + [[nodiscard]] std::shared_ptr lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); + [[nodiscard]] FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); + [[nodiscard]] FileStorHandler::LockedMessageBatch next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; - std::mutex & exposeLock() { return *_lock; } - PriorityQueue & exposeQueue() { return *_queue; } - BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); } + [[nodiscard]] std::mutex & exposeLock() { return *_lock; } + [[nodiscard]] PriorityQueue & exposeQueue() { return *_queue; } + [[nodiscard]] BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); } void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; } - ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const; + [[nodiscard]] ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const; private: void update_cached_queue_size(const std::lock_guard &) { _cached_queue_size.store_relaxed(_queue->size()); @@ -170,15 +173,20 @@ public: void update_cached_queue_size(const std::unique_lock &) { _cached_queue_size.store_relaxed(_queue->size()); } - bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; - FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); + [[nodiscard]] bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; + [[nodiscard]] FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); [[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept; + [[nodiscard]] FileStorHandler::LockedMessage next_message_impl(monitor_guard& held_lock, + vespalib::steady_time deadline); + void fill_feed_op_batch(monitor_guard& held_lock, LockedMessageBatch& batch, + uint32_t max_batch_size, vespalib::steady_time now); + // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. - FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, - PriorityIdx::iterator iter, - ThrottleToken throttle_token); + [[nodiscard]] FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, + PriorityIdx::iterator iter, + ThrottleToken throttle_token); using LockedBuckets = vespalib::hash_map; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -233,7 +241,8 @@ public: bool schedule(const std::shared_ptr&) override; ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr& msg) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; + LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; + LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) override; void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; @@ -292,6 +301,13 @@ public: _throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed); } + void set_max_feed_op_batch_size(uint32_t max_batch) noexcept override { + _max_feed_op_batch_size.store(max_batch, std::memory_order_relaxed); + } + [[nodiscard]] uint32_t max_feed_op_batch_size() const noexcept { + return _max_feed_op_batch_size.load(std::memory_order_relaxed); + } + // Implements ResumeGuard::Callback void resume() override; @@ -316,6 +332,7 @@ private: std::atomic _paused; std::atomic _throttle_apply_bucket_diff_ops; std::optional _last_active_operations_stats; + std::atomic _max_feed_op_batch_size; // Returns the index in the targets array we are sending to, or -1 if none of them match. int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector& targets); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 0046bd96b65..23de39f7130 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -239,6 +239,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config) auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size()); _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params); } + _filestorHandler->set_max_feed_op_batch_size(std::max(1, config.maxFeedOpBatchSize)); // TODO remove once desired throttling behavior is set in stone { _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 0612798b43a..4761937075d 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -134,7 +134,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP _env._metrics.operations.inc(); if (msg.getType().isReply()) { - try{ + try { LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); return handleReply(static_cast(msg), std::move(tracker)); @@ -161,9 +161,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP void PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const { - LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get()); api::StorageMessage & msg(*lock.msg); - // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains // valid even if the tracker is destroyed by an exception in processMessage(). auto tracker = std::make_unique(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, @@ -174,4 +172,24 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } +void +PersistenceHandler::process_locked_message_batch(std::shared_ptr lock, + std::span bucket_messages) +{ + const auto bucket = lock->getBucket(); + auto batch = std::make_shared(std::move(lock), _env, _env._fileStorHandler); + for (auto& bm : bucket_messages) { + assert(bm.first->getBucket() == bucket); + // Important: we _copy_ the message shared_ptr instead of moving to ensure that `*bm.first` remains + // valid even if the tracker is destroyed by an exception in processMessage(). All std::exceptions + // are caught there, so we do not expect our loop to be interrupted. + auto tracker = std::make_unique(framework::MilliSecTimer(_clock), _env, + batch, bm.first, std::move(bm.second)); + tracker = processMessage(*bm.first, std::move(tracker)); + if (tracker) { + tracker->sendReply(); // Actually defers to batch reply queue + } + } +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index 0da518a1cfa..224bb70a16b 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -2,14 +2,16 @@ #pragma once -#include "processallhandler.h" -#include "mergehandler.h" #include "asynchandler.h" +#include "batched_message.h" +#include "mergehandler.h" #include "persistenceutil.h" -#include "splitjoinhandler.h" +#include "processallhandler.h" #include "simplemessagehandler.h" +#include "splitjoinhandler.h" #include #include +#include namespace storage { @@ -29,6 +31,8 @@ public: ~PersistenceHandler(); void processLockedMessage(FileStorHandler::LockedMessage lock) const; + void process_locked_message_batch(std::shared_ptr lock, + std::span bucket_messages); //TODO Rewrite tests to avoid this api leak const AsyncHandler & asyncHandler() const { return _asyncHandler; } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index a98418281d2..5710c3fe26f 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -14,6 +14,7 @@ PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, Fi uint32_t stripeId, framework::Component & component) : _persistenceHandler(persistenceHandler), _fileStorHandler(fileStorHandler), + _clock(component.getClock()), _stripeId(stripeId), _thread() { @@ -36,14 +37,19 @@ PersistenceThread::run(framework::ThreadHandle& thread) vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms); while (!thread.interrupted()) { - vespalib::steady_time now = vespalib::steady_clock::now(); + vespalib::steady_time now = _clock.getMonotonicTime(); thread.registerTick(framework::UNKNOWN_CYCLE, now); vespalib::steady_time deadline = now + max_wait_time; - FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline)); - - if (lock.lock) { - _persistenceHandler.processLockedMessage(std::move(lock)); + auto batch = _fileStorHandler.next_message_batch(_stripeId, now, deadline); + if (!batch.empty()) { + // Special-case single message batches, as actually scheduling a full batch has more + // overhead due to extra bookkeeping state and deferred reply-sending. + if (batch.size() == 1) { + _persistenceHandler.processLockedMessage(batch.release_as_single_msg()); + } else { + _persistenceHandler.process_locked_message_batch(std::move(batch.lock), batch.messages); + } } } LOG(debug, "Closing down persistence thread"); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index aacd1dd4830..2e9852ada73 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -8,6 +8,7 @@ namespace storage { namespace framework { + struct Clock; class Component; class Thread; } @@ -27,10 +28,11 @@ public: framework::Thread& getThread() override { return *_thread; } private: - PersistenceHandler & _persistenceHandler; - FileStorHandler & _fileStorHandler; - uint32_t _stripeId; - std::unique_ptr _thread; + PersistenceHandler& _persistenceHandler; + FileStorHandler& _fileStorHandler; + const framework::Clock& _clock; + uint32_t _stripeId; + std::unique_ptr _thread; void run(framework::ThreadHandle&) override; }; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index c975721c855..54bce72d7ff 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -11,41 +11,82 @@ LOG_SETUP(".persistence.util"); namespace storage { namespace { - bool isBatchable(api::MessageType::Id id) - { - return (id == api::MessageType::PUT_ID || - id == api::MessageType::REMOVE_ID || - id == api::MessageType::UPDATE_ID); - } - bool hasBucketInfo(api::MessageType::Id id) - { - return (isBatchable(id) || - (id == api::MessageType::REMOVELOCATION_ID || - id == api::MessageType::JOINBUCKETS_ID)); +constexpr bool is_batchable(api::MessageType::Id id) noexcept { + return (id == api::MessageType::PUT_ID || + id == api::MessageType::REMOVE_ID || + id == api::MessageType::UPDATE_ID); +} + +constexpr bool has_bucket_info(api::MessageType::Id id) noexcept { + return (is_batchable(id) || + (id == api::MessageType::REMOVELOCATION_ID || + id == api::MessageType::JOINBUCKETS_ID)); +} + +constexpr vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; + +} // anon ns + +DeferredReplySenderStub::DeferredReplySenderStub() = default; +DeferredReplySenderStub::~DeferredReplySenderStub() = default; + +AsyncMessageBatch::AsyncMessageBatch(std::shared_ptr bucket_lock, + const PersistenceUtil& env, + MessageSender& reply_sender) noexcept + : _bucket_lock(std::move(bucket_lock)), + _env(env), + _reply_sender(reply_sender), + _deferred_sender_stub() +{ + assert(_bucket_lock); +} + +AsyncMessageBatch::~AsyncMessageBatch() { + const auto bucket_info = _env.getBucketInfo(_bucket_lock->getBucket()); + _env.updateBucketDatabase(_bucket_lock->getBucket(), bucket_info); + + std::lock_guard lock(_deferred_sender_stub._mutex); // Ensure visibility of posted replies + for (auto& reply : _deferred_sender_stub._deferred_replies) { + if (reply->getResult().success()) { + dynamic_cast(*reply).setBucketInfo(bucket_info); + } + _reply_sender.sendReplyDirectly(reply); } - const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s; + LOG(debug, "Processed async feed message batch of %zu ops for %s. New bucket info is %s", + _deferred_sender_stub._deferred_replies.size(), + _bucket_lock->getBucket().toString().c_str(), bucket_info.toString().c_str()); } MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg, + std::shared_ptr msg, ThrottleToken throttle_token) - : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) + : MessageTracker(timer, env, replySender, true, std::move(bucketLock), {}, std::move(msg), std::move(throttle_token)) +{} + +MessageTracker::MessageTracker(const framework::MilliSecTimer& timer, + const PersistenceUtil& env, + std::shared_ptr batch, + std::shared_ptr msg, + ThrottleToken throttle_token) + : MessageTracker(timer, env, batch->deferred_sender_stub(), false, {}, std::move(batch), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg, + bool update_bucket_info, + std::shared_ptr bucket_lock, + std::shared_ptr part_of_batch, + std::shared_ptr msg, ThrottleToken throttle_token) : _sendReply(true), - _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), - _bucketLock(std::move(bucketLock)), + _updateBucketInfo(update_bucket_info && has_bucket_info(msg->getType().getId())), + _bucketLock(std::move(bucket_lock)), + _part_of_batch(std::move(part_of_batch)), _msg(std::move(msg)), _throttle_token(std::move(throttle_token)), _context(_msg->getPriority(), _msg->getTrace().getLevel()), @@ -61,7 +102,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), - std::move(msg), ThrottleToken())); + {}, std::move(msg), ThrottleToken())); } void @@ -102,6 +143,7 @@ MessageTracker::sendReply() { if (hasReply()) { getReply().getTrace().addChild(_context.steal_trace()); if (_updateBucketInfo) { + assert(_bucketLock); if (getReply().getResult().success()) { _env.setBucketInfo(*this, _bucketLock->getBucket()); } @@ -163,7 +205,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd) std::shared_ptr MessageTracker::sync_phase_done_notifier_or_nullptr() const { - if (_bucketLock->wants_sync_phase_done_notification()) { + if (_bucketLock && _bucketLock->wants_sync_phase_done_notification()) { return _bucketLock; } return {}; @@ -236,7 +278,7 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket & { api::BucketInfo info = getBucketInfo(bucket); - static_cast(tracker.getReply()).setBucketInfo(info); + dynamic_cast(tracker.getReply()).setBucketInfo(info); updateBucketDatabase(bucket, info); } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 900f301252e..71e5c2754e9 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -25,12 +25,50 @@ namespace storage { class PersistenceUtil; +struct DeferredReplySenderStub : MessageSender { + std::mutex _mutex; + std::vector> _deferred_replies; + + DeferredReplySenderStub(); + ~DeferredReplySenderStub() override; + + void sendCommand(const std::shared_ptr&) override { + abort(); // Not supported + } + void sendReply(const std::shared_ptr& reply) override { + std::lock_guard lock(_mutex); + _deferred_replies.emplace_back(reply); + } +}; + +class AsyncMessageBatch { + std::shared_ptr _bucket_lock; + const PersistenceUtil& _env; + MessageSender& _reply_sender; + DeferredReplySenderStub _deferred_sender_stub; +public: + AsyncMessageBatch(std::shared_ptr bucket_lock, + const PersistenceUtil& env, + MessageSender& reply_sender) noexcept; + // Triggered by last referencing batched MessageTracker being destroyed. + // Fetches bucket info, updates DB and sends all deferred replies with the new bucket info. + ~AsyncMessageBatch(); + + [[nodiscard]] MessageSender& deferred_sender_stub() noexcept { return _deferred_sender_stub; } +}; + class MessageTracker { public: using UP = std::unique_ptr; - MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr msg, + MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & reply_sender, + FileStorHandler::BucketLockInterface::SP bucket_lock, std::shared_ptr msg, + ThrottleToken throttle_token); + + // For use with batching where bucket lock is held separately and bucket info + // is _not_ fetched or updated per message. + MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, + std::shared_ptr batch, std::shared_ptr msg, ThrottleToken throttle_token); ~MessageTracker(); @@ -58,48 +96,53 @@ public: * commands like merge. */ void dontReply() { _sendReply = false; } - bool hasReply() const { return bool(_reply); } - const api::StorageReply & getReply() const { + [[nodiscard]] bool hasReply() const { return bool(_reply); } + [[nodiscard]] const api::StorageReply & getReply() const { return *_reply; } - api::StorageReply & getReply() { + [[nodiscard]] api::StorageReply & getReply() { return *_reply; } - std::shared_ptr && stealReplySP() && { + [[nodiscard]] std::shared_ptr && stealReplySP() && { return std::move(_reply); } void generateReply(api::StorageCommand& cmd); - api::ReturnCode getResult() const { return _result; } + [[nodiscard]] api::ReturnCode getResult() const { return _result; } - spi::Context & context() { return _context; } - document::BucketId getBucketId() const { + [[nodiscard]] spi::Context & context() { return _context; } + [[nodiscard]] document::BucketId getBucketId() const { return _bucketLock->getBucket().getBucketId(); } void sendReply(); - bool checkForError(const spi::Result& response); + [[nodiscard]] bool checkForError(const spi::Result& response); // Returns a non-nullptr notifier instance iff the underlying operation wants to be notified - // when the sync phase is complete. Otherwise returns a nullptr shared_ptr. - std::shared_ptr sync_phase_done_notifier_or_nullptr() const; + // when the sync phase is complete. Otherwise, returns a nullptr shared_ptr. + [[nodiscard]] std::shared_ptr sync_phase_done_notifier_or_nullptr() const; static MessageTracker::UP createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr msg); private: - MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr msg, + MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, + MessageSender& reply_sender, bool update_bucket_info, + std::shared_ptr bucket_lock, + std::shared_ptr part_of_batch, + std::shared_ptr msg, ThrottleToken throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; bool _sendReply; bool _updateBucketInfo; + // Either _bucketLock or _part_of_batch must be set, never both at the same time FileStorHandler::BucketLockInterface::SP _bucketLock; + std::shared_ptr _part_of_batch; // nullptr if not batched std::shared_ptr _msg; ThrottleToken _throttle_token; spi::Context _context; @@ -117,8 +160,6 @@ public: struct LockResult { std::shared_ptr lock; LockResult() : lock() {} - - bool bucketExisted() const { return bool(lock); } }; PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler, diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h index 277e7b4fdfd..93b305370e0 100644 --- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h +++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h @@ -13,7 +13,7 @@ namespace storage::framework::defaultimplementation { -struct RealClock : public Clock { +struct RealClock final : public Clock { vespalib::steady_time getMonotonicTime() const override; vespalib::system_time getSystemTime() const override; }; -- cgit v1.2.3 From 65790df4dfb9e5c0ab3e1fd90fa117351bb624ef Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 13:38:16 +0000 Subject: Ensure visibility of max batch size reconfiguration in persistence thread --- .../src/tests/persistence/filestorage/filestormanagertest.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 56149bbc14d..43e6bbe1c89 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -411,9 +411,18 @@ TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_inf c.manager->getFileStorHandler().set_max_feed_op_batch_size(10); BucketId bucket_id(16, 1); createBucket(bucket_id); + // The persistence thread is already running at this point, and may not have observed the max + // batch size configuration change. Trigger an implicit thread barrier by roundtripping a message. + { + auto get = make_get_command(120, "id:foo:testdoctype1:n=1:0"); + get->setAddress(_storage3); + c.top.sendDown(get); + c.top.waitForMessages(1, _waitTime); + (void)c.top.getRepliesOnce(); + } constexpr uint32_t n = 10; { - // Barrier to prevent any messages from being processed until we've enqueued all puts + // Explicit barrier to prevent any messages from being processed until we've enqueued all puts auto guard = c.manager->getFileStorHandler().lock(makeDocumentBucket(bucket_id), LockingRequirements::Exclusive); for (uint32_t i = 0; i < n; ++i) { auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i); -- cgit v1.2.3 From 0f19429b96e31a3bdff4658deef9f8910c53c1f5 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 14:20:01 +0000 Subject: Add Abseil failure signal handler to test runner to get stack dumps for crashes --- storage/src/tests/persistence/filestorage/CMakeLists.txt | 1 + storage/src/tests/persistence/filestorage/gtest_runner.cpp | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/storage/src/tests/persistence/filestorage/CMakeLists.txt b/storage/src/tests/persistence/filestorage/CMakeLists.txt index 951a361474e..aa7c9fe995c 100644 --- a/storage/src/tests/persistence/filestorage/CMakeLists.txt +++ b/storage/src/tests/persistence/filestorage/CMakeLists.txt @@ -19,6 +19,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST storage_testhostreporter storage_testpersistence_common GTest::GTest + absl::failure_signal_handler ) vespa_add_test( NAME storage_filestorage_gtest_runner_app COMMAND storage_filestorage_gtest_runner_app COST 50) diff --git a/storage/src/tests/persistence/filestorage/gtest_runner.cpp b/storage/src/tests/persistence/filestorage/gtest_runner.cpp index 5d1fde4130c..1ed7bc91843 100644 --- a/storage/src/tests/persistence/filestorage/gtest_runner.cpp +++ b/storage/src/tests/persistence/filestorage/gtest_runner.cpp @@ -1,8 +1,17 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include +#include #include LOG_SETUP("storage_filestorage_gtest_runner"); -GTEST_MAIN_RUN_ALL_TESTS() +int main(int argc, char* argv[]) { + absl::FailureSignalHandlerOptions opts; + opts.call_previous_handler = true; + opts.use_alternate_stack = false; // Suboptimal, but needed to get proper backtracing (for some reason...) + absl::InstallFailureSignalHandler(opts); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} -- cgit v1.2.3 From 8866927d906a9869d668587bd1c12ddec367a59c Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 14:57:35 +0000 Subject: Rewrite test to manually start single persistence thread Avoids the need for barriers to avoid stepping on the thread's toes --- .../filestorage/filestormanagertest.cpp | 32 ++++++++-------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 43e6bbe1c89..3846d64d027 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -407,30 +407,19 @@ TEST_F(FileStorManagerTest, put) { } TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_info) { - TestFileStorComponents c(*this); - c.manager->getFileStorHandler().set_max_feed_op_batch_size(10); + PersistenceHandlerComponents c(*this); + c.filestorHandler->set_max_feed_op_batch_size(10); BucketId bucket_id(16, 1); createBucket(bucket_id); - // The persistence thread is already running at this point, and may not have observed the max - // batch size configuration change. Trigger an implicit thread barrier by roundtripping a message. - { - auto get = make_get_command(120, "id:foo:testdoctype1:n=1:0"); - get->setAddress(_storage3); - c.top.sendDown(get); - c.top.waitForMessages(1, _waitTime); - (void)c.top.getRepliesOnce(); - } constexpr uint32_t n = 10; - { - // Explicit barrier to prevent any messages from being processed until we've enqueued all puts - auto guard = c.manager->getFileStorHandler().lock(makeDocumentBucket(bucket_id), LockingRequirements::Exclusive); - for (uint32_t i = 0; i < n; ++i) { - auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i); - put->setAddress(_storage3); - c.top.sendDown(put); - } - } - // All 10 puts shall now be visible and waiting for the persistence thread to fetch as a single batch. + // No persistence thread started yet, so no chance of racing + for (uint32_t i = 0; i < n; ++i) { + auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i); + put->setAddress(_storage3); + c.filestorHandler->schedule(put); + } + auto pt = c.make_disk_thread(); + c.filestorHandler->flush(true); c.top.waitForMessages(n, _waitTime); api::BucketInfo expected_bucket_info; { @@ -445,6 +434,7 @@ TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_inf auto actual_bucket_info = dynamic_cast(*reply).getBucketInfo(); EXPECT_EQ(actual_bucket_info, expected_bucket_info); } + c.filestorHandler->close(); // Ensure persistence thread is no longer in message fetch code } TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { -- cgit v1.2.3 From 0533ce61bae90afc84798f8fecba24d5a5ccee20 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 15:01:32 +0000 Subject: Use `static_cast` instead of `dynamic_cast` Downcast-safe type invariant shall be maintained by the message's own type ID tracking. If it's not, we have bigger problems. --- .../vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 2e2634025a7..6365eb1ab30 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -1027,11 +1027,11 @@ constexpr bool is_batchable_feed_op(api::MessageType::Id id) noexcept { document::GlobalId gid_from_feed_op(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::PUT_ID: - return dynamic_cast(msg).getDocumentId().getGlobalId(); + return static_cast(msg).getDocumentId().getGlobalId(); case api::MessageType::REMOVE_ID: - return dynamic_cast(msg).getDocumentId().getGlobalId(); + return static_cast(msg).getDocumentId().getGlobalId(); case api::MessageType::UPDATE_ID: - return dynamic_cast(msg).getDocumentId().getGlobalId(); + return static_cast(msg).getDocumentId().getGlobalId(); default: abort(); } } -- cgit v1.2.3 From 9c99d7c58942bd3c035cfa3e0be4bffa515646e1 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 15:26:13 +0000 Subject: Ensure all async reply processing executor tasks have completed --- storage/src/tests/persistence/filestorage/filestormanagertest.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 3846d64d027..f12b85eb2ea 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -421,6 +421,7 @@ TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_inf auto pt = c.make_disk_thread(); c.filestorHandler->flush(true); c.top.waitForMessages(n, _waitTime); + c.executor.sync_all(); // Ensure all async reply processing tasks must have completed. api::BucketInfo expected_bucket_info; { StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket_id, "foo")); -- cgit v1.2.3 From 32df7bf1195fd3c90a77efb1aa7311e4655c1737 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 9 Apr 2024 15:55:58 +0000 Subject: Low-level message fetch routine must not implicitly unlock mutex Implicitly unlocking messes up higher level assumptions about when locks are held and thus cannot be safely done. Lock will be unlocked immediately after anyway, so this does not seem like a useful optimization. --- .../src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 6365eb1ab30..7589fb3cdb3 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -1058,6 +1058,7 @@ FileStorHandlerImpl::Stripe::fill_feed_op_batch(monitor_guard& guard, LockedMess uint32_t max_batch_size, vespalib::steady_time now) { assert(batch.size() == 1); + assert(guard.owns_lock()); BucketIdx& idx = bmi::get<2>(*_queue); auto bucket_msgs = idx.equal_range(batch.lock->getBucket()); // Process in FIFO order (_not_ priority order) until we hit the end, a non-batchable operation @@ -1136,7 +1137,6 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx auto locker = std::make_unique(guard, *this, bucket, msg->getPriority(), msg->getType().getId(), msg->getMsgId(), msg->lockingRequirements()); - guard.unlock(); return {std::move(locker), std::move(msg), std::move(throttle_token)}; } else { std::shared_ptr msgReply(makeQueueTimeoutReply(*msg)); -- cgit v1.2.3 From e5597498f44f5e34d87c64fcbd8d6643417e7baf Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 10 Apr 2024 09:36:36 +0000 Subject: Ensure SetUp/TearDown symmetry with test superclass Can't initialize members in constructor that depend on objects that are subsequently reset by the superclass' `SetUp()` method. --- .../persistence/active_operations_stats_test.cpp | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp index bf91a84235a..71be34e3f54 100644 --- a/storage/src/tests/persistence/active_operations_stats_test.cpp +++ b/storage/src/tests/persistence/active_operations_stats_test.cpp @@ -28,6 +28,8 @@ public: std::shared_ptr createPut(uint64_t bucket, uint64_t docIdx); std::shared_ptr createGet(uint64_t bucket) const; + void SetUp() override; + void TearDown() override; void assert_active_operations_stats(const ActiveOperationsStats &stats, uint32_t exp_active_size, uint32_t exp_size_samples, uint32_t exp_latency_samples); void update_metrics(); void test_active_operations_stats(); @@ -41,16 +43,30 @@ ActiveOperationsStatsTest::ActiveOperationsStatsTest() metrics(), stripeId(0) { + // Initialization of members must happen in SetUp() since this test transitively + // depends on components modified by the superclass' SetUp() method. +} + +void +ActiveOperationsStatsTest::SetUp() +{ + FileStorTestFixture::SetUp(); setupPersistenceThreads(1); _node->setPersistenceProvider(std::make_unique(_node->getTypeRepo())); top.push_back(std::move(dummyManager)); top.open(); metrics.initDiskMetrics(1, 1); - filestorHandler = std::make_unique(messageSender, metrics, - _node->getComponentRegister()); + filestorHandler = std::make_unique(messageSender, metrics, _node->getComponentRegister()); filestorHandler->setGetNextMessageTimeout(20ms); } +void +ActiveOperationsStatsTest::TearDown() +{ + filestorHandler.reset(); + FileStorTestFixture::TearDown(); +} + ActiveOperationsStatsTest::~ActiveOperationsStatsTest() = default; std::shared_ptr -- cgit v1.2.3 From 496309291cb7171bf25b0f73b1465e64c978fdfc Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 10 Apr 2024 09:51:33 +0000 Subject: Ensure parameter evaluation order does not have side effects --- storage/src/vespa/storage/persistence/persistencehandler.cpp | 4 ++-- storage/src/vespa/storage/persistence/persistenceutil.cpp | 3 ++- storage/src/vespa/storage/persistence/persistenceutil.h | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 4761937075d..a01a4656d01 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -183,8 +183,8 @@ PersistenceHandler::process_locked_message_batch(std::shared_ptr(framework::MilliSecTimer(_clock), _env, - batch, bm.first, std::move(bm.second)); + auto tracker = std::make_unique(framework::MilliSecTimer(_clock), _env, batch, + batch->deferred_sender_stub(), bm.first, std::move(bm.second)); tracker = processMessage(*bm.first, std::move(tracker)); if (tracker) { tracker->sendReply(); // Actually defers to batch reply queue diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 54bce72d7ff..febc494aff1 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -70,9 +70,10 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, MessageTracker::MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, std::shared_ptr batch, + MessageSender& deferred_reply_sender, std::shared_ptr msg, ThrottleToken throttle_token) - : MessageTracker(timer, env, batch->deferred_sender_stub(), false, {}, std::move(batch), std::move(msg), std::move(throttle_token)) + : MessageTracker(timer, env, deferred_reply_sender, false, {}, std::move(batch), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 71e5c2754e9..67e96befe00 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -68,7 +68,9 @@ public: // For use with batching where bucket lock is held separately and bucket info // is _not_ fetched or updated per message. MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env, - std::shared_ptr batch, std::shared_ptr msg, + std::shared_ptr batch, + MessageSender& deferred_reply_sender, + std::shared_ptr msg, ThrottleToken throttle_token); ~MessageTracker(); -- cgit v1.2.3