summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-04-05 14:59:51 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-04-09 12:31:46 +0000
commita8dd709dceca4c53096be285f35686439a7902eb (patch)
tree5754badd60b8654426694ec83d9d6dac110eb364
parent7a5047b9cb7c1ad40bc69dbacfbbbeafbe15b83a (diff)
Support pipelining (batching) of mutating ops to same bucket
Bucket operations require either exclusive (single writer) or shared (multiple readers) access. Prior to this commit, this means that many enqueued feed operations to the same bucket introduce pipeline stalls due to each operation having to wait for all prior operations to the bucket to complete entirely (including fsync of WAL append). This is a likely scenario when feeding a document set that was previously acquired through visiting, as such documents will inherently be output in bucket-order. With this commit, a configurable number of feed operations (put, remove and update) bound for the exact same bucket may be sent asynchronously to the persistence provider in the context of the _same_ write lock. This mirrors how merge operations work for puts and removes. Batching is fairly conservative, and will _not_ batch across further messages when any of the following holds: * A non-feed operation is encountered * More than one mutating operation is encountered for the same document ID * No more persistence throttler tokens can be acquired * Max batch size has been reached Updating the bucket DB, assigning bucket info and sending replies is deferred until _all_ batched operations complete. Max batch size is (re-)configurable live and defaults to a batch size of 1, which shall have the exact same semantics as the legacy behavior. Additionally, clock sampling for persistence threads have been abstracted away to allow for mocking in tests (no need for sleep!).
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def9
-rw-r--r--metrics/src/vespa/metrics/metrictimer.cpp9
-rw-r--r--metrics/src/vespa/metrics/metrictimer.h24
-rw-r--r--storage/src/tests/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp318
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp35
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/batched_message.h14
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h65
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp126
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h59
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h10
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h10
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp86
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h73
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h2
20 files changed, 767 insertions, 132 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index de67d4336e9..a5d86cc91ba 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -29,7 +29,7 @@ response_sequencer_type enum {LATENCY, THROUGHPUT, ADAPTIVE} default=ADAPTIVE re
## Should follow stor-distributormanager:splitsize (16MB).
bucket_merge_chunk_size int default=16772216 restart
-## Whether or not to use async message handling when scheduling storage messages from FileStorManager.
+## Whether to use async message handling when scheduling storage messages from FileStorManager.
##
## When turned on, the calling thread (e.g. FNET network thread when using Storage API RPC)
## gets the next async message to handle (if any) as part of scheduling a storage message.
@@ -61,3 +61,10 @@ async_operation_throttler.window_size_backoff double default=0.95
async_operation_throttler.min_window_size int default=20
async_operation_throttler.max_window_size int default=-1 # < 0 implies INT_MAX
async_operation_throttler.resize_rate double default=3.0
+
+## Maximum number of enqueued put/remove/update operations towards a given bucket
+## that can be dispatched asynchronously as a batch under the same write lock.
+## This prevents pipeline stalls when many write operations are in-flight to the
+## same bucket, as each operation would otherwise have to wait for the completion
+## of all prior writes to the bucket.
+max_feed_op_batch_size int default=1
diff --git a/metrics/src/vespa/metrics/metrictimer.cpp b/metrics/src/vespa/metrics/metrictimer.cpp
index 84d4844104d..a3b0f215d58 100644
--- a/metrics/src/vespa/metrics/metrictimer.cpp
+++ b/metrics/src/vespa/metrics/metrictimer.cpp
@@ -3,13 +3,18 @@
namespace metrics {
-MetricTimer::MetricTimer()
+MetricTimer::MetricTimer() noexcept
+ : _startTime(std::chrono::steady_clock::now())
{
// Amusingly enough, steady_clock was not actually steady by default on
// GCC < 4.8.1, so add a bit of compile-time paranoia just to make sure.
static_assert(std::chrono::steady_clock::is_steady,
"Old/broken STL implementation; steady_clock not steady");
- _startTime = std::chrono::steady_clock::now();
+}
+
+MetricTimer::MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept
+ : _startTime(start_time)
+{
}
} // metrics
diff --git a/metrics/src/vespa/metrics/metrictimer.h b/metrics/src/vespa/metrics/metrictimer.h
index 8a338432362..133cd819489 100644
--- a/metrics/src/vespa/metrics/metrictimer.h
+++ b/metrics/src/vespa/metrics/metrictimer.h
@@ -15,7 +15,19 @@ namespace metrics {
class MetricTimer {
public:
- MetricTimer();
+ // Start time point set by system steady clock
+ MetricTimer() noexcept;
+ // Start time point explicitly given
+ explicit MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept;
+
+ template<typename AvgVal, typename TotVal, bool SumOnAdd>
+ AvgVal stop(std::chrono::steady_clock::time_point now, ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
+ const auto delta = now - _startTime;
+ using ToDuration = std::chrono::duration<AvgVal, std::milli>;
+ const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
+ metric.addValue(deltaMs);
+ return deltaMs;
+ }
/**
* Adds ms passed since this timer was constructed to given value metric.
@@ -26,11 +38,11 @@ public:
*/
template<typename AvgVal, typename TotVal, bool SumOnAdd>
AvgVal stop(ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
- const auto delta = std::chrono::steady_clock::now() - _startTime;
- using ToDuration = std::chrono::duration<AvgVal, std::milli>;
- const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
- metric.addValue(deltaMs);
- return deltaMs;
+ return stop(std::chrono::steady_clock::now(), metric);
+ }
+
+ [[nodiscard]] std::chrono::steady_clock::time_point start_time() const noexcept {
+ return _startTime;
}
private:
diff --git a/storage/src/tests/persistence/filestorage/CMakeLists.txt b/storage/src/tests/persistence/filestorage/CMakeLists.txt
index f1a8a286bbd..951a361474e 100644
--- a/storage/src/tests/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/tests/persistence/filestorage/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST
SOURCES
deactivatebucketstest.cpp
deletebuckettest.cpp
+ feed_operation_batching_test.cpp
filestormanagertest.cpp
filestormodifiedbucketstest.cpp
mergeblockingtest.cpp
diff --git a/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp
new file mode 100644
index 00000000000..cf16123933b
--- /dev/null
+++ b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp
@@ -0,0 +1,318 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/persistence/common/filestortestfixture.h>
+#include <tests/persistence/filestorage/forwardingmessagesender.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/persistence/filestorage/filestormetrics.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <gtest/gtest.h>
+
+using document::test::makeDocumentBucket;
+using document::BucketId;
+using document::DocumentId;
+using namespace ::testing;
+
+namespace storage {
+
+struct FeedOperationBatchingTest : FileStorTestFixture {
+ DummyStorageLink _top;
+ std::unique_ptr<ForwardingMessageSender> _message_sender;
+ FileStorMetrics _metrics;
+ std::unique_ptr<FileStorHandler> _handler;
+ api::Timestamp _next_timestamp;
+
+ FeedOperationBatchingTest();
+ ~FeedOperationBatchingTest() override;
+
+ void SetUp() override {
+ FileStorTestFixture::SetUp();
+ // This silly little indirection is a work-around for the top-level link needing something
+ // below it to send _up_ into it, rather than directly receiving the messages itself.
+ auto message_receiver = std::make_unique<DummyStorageLink>();
+ _message_sender = std::make_unique<ForwardingMessageSender>(*message_receiver);
+ _top.push_back(std::move(message_receiver));
+ _top.open();
+ _metrics.initDiskMetrics(1, 1);
+ // By default, sets up 1 thread with 1 stripe
+ _handler = std::make_unique<FileStorHandlerImpl>(*_message_sender, _metrics, _node->getComponentRegister());
+ _handler->set_max_feed_op_batch_size(3);
+ }
+
+ void TearDown() override {
+ _handler.reset();
+ FileStorTestFixture::TearDown();
+ }
+
+ [[nodiscard]] static vespalib::string id_str_of(uint32_t bucket_idx, uint32_t doc_idx) {
+ return vespalib::make_string("id:foo:testdoctype1:n=%u:%u", bucket_idx, doc_idx);
+ }
+
+ [[nodiscard]] static DocumentId id_of(uint32_t bucket_idx, uint32_t doc_idx) {
+ return DocumentId(id_str_of(bucket_idx, doc_idx));
+ }
+
+ void schedule_msg(const std::shared_ptr<api::StorageMessage>& msg) {
+ msg->setAddress(makeSelfAddress());
+ _handler->schedule(msg); // takes shared_ptr by const ref, no point in moving
+ }
+
+ void send_put(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp, vespalib::duration timeout) {
+ auto id = id_str_of(bucket_idx, doc_idx);
+ auto doc = _node->getTestDocMan().createDocument("foobar", id);
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket({16, bucket_idx}), std::move(doc), timestamp);
+ cmd->setTimeout(timeout);
+ schedule_msg(cmd);
+ }
+
+ void send_put(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_put(bucket_idx, doc_idx, next_timestamp(), 60s);
+ }
+
+ void send_puts(std::initializer_list<std::pair<uint32_t, uint32_t>> bucket_docs) {
+ for (const auto& bd : bucket_docs) {
+ send_put(bd.first, bd.second);
+ }
+ }
+
+ void send_get(uint32_t bucket_idx, uint32_t doc_idx) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket({16, bucket_idx}), id, document::AllFields::NAME);
+ schedule_msg(cmd);
+ }
+
+ void send_remove(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto cmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket({16, bucket_idx}), id, timestamp);
+ schedule_msg(cmd);
+ }
+
+ void send_remove(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_remove(bucket_idx, doc_idx, next_timestamp());
+ }
+
+ void send_update(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _node->getTestDocMan().getTypeRepo(),
+ _node->getTestDocMan().createRandomDocument()->getType(), id);
+ auto cmd = std::make_shared<api::UpdateCommand>(makeDocumentBucket({16, bucket_idx}), std::move(update), timestamp);
+ schedule_msg(cmd);
+ }
+
+ void send_update(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_update(bucket_idx, doc_idx, next_timestamp());
+ }
+
+ [[nodiscard]] api::Timestamp next_timestamp() {
+ auto ret = _next_timestamp;
+ ++_next_timestamp;
+ return ret;
+ }
+
+ [[nodiscard]] vespalib::steady_time fake_now() const {
+ return _node->getClock().getMonotonicTime();
+ }
+
+ [[nodiscard]] vespalib::steady_time fake_deadline() const {
+ return _node->getClock().getMonotonicTime() + 60s;
+ }
+
+ [[nodiscard]] FileStorHandler::LockedMessageBatch next_batch() {
+ return _handler->next_message_batch(0, fake_now(), fake_deadline());
+ }
+
+ template <typename CmdType>
+ static void assert_batch_msg_is(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ ASSERT_LT(msg_idx, batch.size());
+ auto msg = batch.messages[msg_idx].first;
+ auto* as_cmd = dynamic_cast<const CmdType*>(msg.get());
+ ASSERT_TRUE(as_cmd) << msg->toString() << " does not have the expected type";
+ EXPECT_EQ(as_cmd->getBucketId(), BucketId(16, expected_bucket_idx));
+
+ auto id = as_cmd->getDocumentId();
+ ASSERT_TRUE(id.getScheme().hasNumber());
+ EXPECT_EQ(id.getScheme().getNumber(), expected_bucket_idx) << id;
+ std::string actual_id_part = id.getScheme().getNamespaceSpecific();
+ std::string expected_id_part = std::to_string(expected_doc_idx);
+ EXPECT_EQ(actual_id_part, expected_id_part) << id;
+ }
+
+ static void assert_batch_msg_is_put(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::PutCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_remove(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::RemoveCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_update(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::UpdateCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_get(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::GetCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ enum Type {
+ Put,
+ Update,
+ Remove,
+ Get
+ };
+
+ static void assert_empty_batch(const FileStorHandler::LockedMessageBatch& batch) {
+ ASSERT_TRUE(batch.empty());
+ ASSERT_FALSE(batch.lock);
+ }
+
+ static void assert_batch(const FileStorHandler::LockedMessageBatch& batch,
+ uint32_t expected_bucket_idx,
+ std::initializer_list<std::pair<Type, uint32_t>> expected_msgs)
+ {
+ ASSERT_TRUE(batch.lock);
+ ASSERT_EQ(batch.lock->getBucket().getBucketId(), BucketId(16, expected_bucket_idx));
+ ASSERT_EQ(batch.size(), expected_msgs.size());
+
+ uint32_t idx = 0;
+ for (const auto& msg : expected_msgs) {
+ switch (msg.first) {
+ case Type::Put: assert_batch_msg_is_put(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Update: assert_batch_msg_is_update(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Remove: assert_batch_msg_is_remove(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Get: assert_batch_msg_is_get(batch, idx, expected_bucket_idx, msg.second); break;
+ default: FAIL();
+ }
+ ++idx;
+ }
+ }
+};
+
+FeedOperationBatchingTest::FeedOperationBatchingTest()
+ : FileStorTestFixture(),
+ _top(),
+ _message_sender(),
+ _metrics(),
+ _handler(),
+ _next_timestamp(1000)
+{
+}
+
+FeedOperationBatchingTest::~FeedOperationBatchingTest() = default;
+
+// Note: unless explicitly set by the testcase, max batch size is 3
+
+TEST_F(FeedOperationBatchingTest, batching_is_disabled_with_1_max_batch_size) {
+ _handler->set_max_feed_op_batch_size(1);
+ send_puts({{1, 1}, {1, 2}, {2, 3}, {2, 4}});
+ // No batching; has the same behavior as current FIFO
+ assert_batch(next_batch(), 1, {{Put, 1}});
+ assert_batch(next_batch(), 1, {{Put, 2}});
+ assert_batch(next_batch(), 2, {{Put, 3}});
+ assert_batch(next_batch(), 2, {{Put, 4}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_is_limited_to_configured_max_size) {
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_batch(next_batch(), 1, {{Put, 4}, {Put, 5}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_can_consume_entire_queue) {
+ send_puts({{1, 1}, {1, 2}, {1, 3}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_is_only_done_for_single_bucket) {
+ send_puts({{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}});
+ assert_batch(next_batch(), 1, {{Put, 1}});
+ assert_batch(next_batch(), 2, {{Put, 2}, {Put, 3}, {Put, 4}});
+ assert_batch(next_batch(), 3, {{Put, 5}});
+}
+
+TEST_F(FeedOperationBatchingTest, batch_can_include_all_supported_feed_op_types) {
+ send_put(1, 1);
+ send_remove(1, 2);
+ send_update(1, 3);
+ assert_batch(next_batch(), 1, {{Put, 1}, {Remove, 2}, {Update, 3}});
+}
+
+TEST_F(FeedOperationBatchingTest, timed_out_reqeusts_are_ignored_by_batch) {
+ send_puts({{1, 1}});
+ send_put(1, 2, next_timestamp(), 1s);
+ send_puts({{1, 3}});
+ _node->getClock().addSecondsToTime(2);
+ // Put #2 with 1s timeout has expired in the queue and should not be returned as part of the batch
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 3}});
+ ASSERT_EQ(_top.getNumReplies(), 0);
+ // The actual timeout is handled by the next message fetch invocation
+ assert_empty_batch(next_batch());
+ ASSERT_EQ(_top.getNumReplies(), 1);
+ EXPECT_EQ(dynamic_cast<api::StorageReply&>(*_top.getReply(0)).getResult().getResult(), api::ReturnCode::TIMEOUT);
+}
+
+TEST_F(FeedOperationBatchingTest, non_feed_ops_are_not_batched) {
+ send_get(1, 2);
+ send_get(1, 3);
+ assert_batch(next_batch(), 1, {{Get, 2}});
+ assert_batch(next_batch(), 1, {{Get, 3}});
+}
+
+TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_non_feed_op) {
+ // It can reasonably be argued that we could batch _around_ a Get operation and still
+ // have correct behavior, but the Get here is just a stand-in for an arbitrary operation such
+ // as a Split (which changes the bucket set), which is rather more tricky to reason about.
+ // For simplicity and understandability, just stall the batch pipeline (at least for now).
+ send_get(1, 2);
+ send_puts({{1, 3}, {1, 4}});
+ send_get(1, 5);
+ send_puts({{1, 6}, {1, 7}});
+
+ assert_batch(next_batch(), 1, {{Get, 2}}); // If first op is non-feed, only it should be returned
+ assert_batch(next_batch(), 1, {{Put, 3}, {Put, 4}});
+ assert_batch(next_batch(), 1, {{Get, 5}});
+ assert_batch(next_batch(), 1, {{Put, 6}, {Put, 7}});
+}
+
+TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_concurrent_ops_to_same_document) {
+ // 2 ops to doc #2. Since this is expected to be a very rare edge case, just
+ // stop batching at that point and defer the concurrent op to the next batch.
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 2}, {1, 4}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_batch(next_batch(), 1, {{Put, 2}, {Put, 4}});
+}
+
+TEST_F(FeedOperationBatchingTest, batch_respects_persistence_throttling) {
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.min_window_size = 3;
+ params.max_window_size = 3;
+ params.window_size_increment = 1;
+ _handler->use_dynamic_operation_throttling(true);
+ _handler->reconfigure_dynamic_throttler(params);
+ _handler->set_max_feed_op_batch_size(10); // > win size to make sure we test the right thing
+
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}});
+ auto batch = next_batch(); // holds 3 throttle tokens
+ assert_batch(batch, 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ // No more throttle tokens available
+ assert_empty_batch(next_batch());
+}
+
+} // storage
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 1ccd51d3f06..56149bbc14d 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -43,6 +43,7 @@ LOG_SETUP(".filestormanagertest");
using std::unique_ptr;
using document::Document;
+using document::BucketId;
using namespace storage::api;
using storage::spi::test::makeSpiBucket;
using document::test::makeDocumentBucket;
@@ -405,6 +406,38 @@ TEST_F(FileStorManagerTest, put) {
}
}
+TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_info) {
+ TestFileStorComponents c(*this);
+ c.manager->getFileStorHandler().set_max_feed_op_batch_size(10);
+ BucketId bucket_id(16, 1);
+ createBucket(bucket_id);
+ constexpr uint32_t n = 10;
+ {
+ // Barrier to prevent any messages from being processed until we've enqueued all puts
+ auto guard = c.manager->getFileStorHandler().lock(makeDocumentBucket(bucket_id), LockingRequirements::Exclusive);
+ for (uint32_t i = 0; i < n; ++i) {
+ auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i);
+ put->setAddress(_storage3);
+ c.top.sendDown(put);
+ }
+ }
+ // All 10 puts shall now be visible and waiting for the persistence thread to fetch as a single batch.
+ c.top.waitForMessages(n, _waitTime);
+ api::BucketInfo expected_bucket_info;
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket_id, "foo"));
+ ASSERT_TRUE(entry.exists());
+ EXPECT_EQ(entry->getBucketInfo().getDocumentCount(), n);
+ expected_bucket_info = entry->getBucketInfo();
+ }
+ // All replies should have the _same_ bucket info due to being processed in the same batch.
+ auto replies = c.top.getRepliesOnce();
+ for (auto& reply : replies) {
+ auto actual_bucket_info = dynamic_cast<api::PutReply&>(*reply).getBucketInfo();
+ EXPECT_EQ(actual_bucket_info, expected_bucket_info);
+ }
+}
+
TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) {
TestFileStorComponents c(*this);
@@ -726,7 +759,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
filestorHandler.schedule(cmd);
}
- std::this_thread::sleep_for(51ms);
+ _node->getClock().addMilliSecondsToTime(51);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
if (lock.lock.get()) {
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 59e0853cc21..ab176ebb9cb 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -143,7 +143,7 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider
MessageTracker::UP
AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const {
auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->sendReply();
});
spi::Bucket bucket(cmd.getBucket());
@@ -169,7 +169,7 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket());
auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->sendReply();
});
_spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(),
@@ -517,7 +517,7 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
}
auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed));
tracker->sendReply();
});
diff --git a/storage/src/vespa/storage/persistence/batched_message.h b/storage/src/vespa/storage/persistence/batched_message.h
new file mode 100644
index 00000000000..c23383edfde
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/batched_message.h
@@ -0,0 +1,14 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.//
+#pragma once
+
+#include "shared_operation_throttler.h"
+#include <memory>
+#include <utility>
+
+namespace storage {
+
+namespace api { class StorageMessage; }
+
+using BatchedMessage = std::pair<std::shared_ptr<api::StorageMessage>, ThrottleToken>;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index a89b705de1b..274d59899d9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -5,4 +5,15 @@ namespace storage {
FileStorHandler::LockedMessage::~LockedMessage() = default;
+FileStorHandler::LockedMessageBatch::LockedMessageBatch(LockedMessage&& initial_msg)
+ : lock(std::move(initial_msg.lock)),
+ messages()
+{
+ if (lock) {
+ messages.emplace_back(std::move(initial_msg.msg), std::move(initial_msg.throttle_token));
+ }
+}
+
+FileStorHandler::LockedMessageBatch::~LockedMessageBatch() = default;
+
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 68b8411c762..6a8b74baf1d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -1,23 +1,13 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-/**
- * \class storage::FileStorHandler
- * \ingroup storage
- *
- * \brief Common resource for filestor threads
- *
- * Takes care of the interface between file stor threads and the file stor
- * manager to avoid circular dependencies, and confine the implementation that
- * needs to worry about locking between these components.
- */
-
#pragma once
#include <vespa/document/bucket/bucket.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/persistence/batched_message.h>
#include <vespa/storage/persistence/shared_operation_throttler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/vespalib/util/small_vector.h>
namespace storage {
namespace api {
@@ -80,13 +70,7 @@ public:
std::shared_ptr<api::StorageMessage> msg;
ThrottleToken throttle_token;
- LockedMessage() noexcept = default;
- LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
- std::shared_ptr<api::StorageMessage> msg_) noexcept
- : lock(std::move(lock_)),
- msg(std::move(msg_)),
- throttle_token()
- {}
+ constexpr LockedMessage() noexcept = default;
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
std::shared_ptr<api::StorageMessage> msg_,
ThrottleToken token) noexcept
@@ -98,27 +82,40 @@ public:
~LockedMessage();
};
+ struct LockedMessageBatch {
+ std::shared_ptr<BucketLockInterface> lock;
+ vespalib::SmallVector<BatchedMessage, 1> messages;
+
+ LockedMessageBatch() = default;
+ explicit LockedMessageBatch(LockedMessage&& initial_msg);
+ LockedMessageBatch(LockedMessageBatch&&) noexcept = default;
+ ~LockedMessageBatch();
+
+ [[nodiscard]] bool empty() const noexcept { return messages.empty(); }
+ [[nodiscard]] size_t size() const noexcept { return messages.size(); }
+ // Precondition: messages.size() == 1
+ [[nodiscard]] LockedMessage release_as_single_msg() noexcept;
+ };
+
class ScheduleAsyncResult {
- private:
- bool _was_scheduled;
+ bool _was_scheduled;
LockedMessage _async_message;
-
public:
- ScheduleAsyncResult() : _was_scheduled(false), _async_message() {}
- explicit ScheduleAsyncResult(LockedMessage&& async_message_in)
+ constexpr ScheduleAsyncResult() noexcept : _was_scheduled(false), _async_message() {}
+ explicit ScheduleAsyncResult(LockedMessage&& async_message_in) noexcept
: _was_scheduled(true),
_async_message(std::move(async_message_in))
{}
- bool was_scheduled() const {
+ [[nodiscard]] bool was_scheduled() const noexcept {
return _was_scheduled;
}
- bool has_async_message() const {
- return _async_message.lock.get() != nullptr;
+ [[nodiscard]] bool has_async_message() const noexcept {
+ return static_cast<bool>(_async_message.lock);
}
- const LockedMessage& async_message() const {
+ [[nodiscard]] const LockedMessage& async_message() const noexcept {
return _async_message;
}
- LockedMessage&& release_async_message() {
+ [[nodiscard]] LockedMessage&& release_async_message() noexcept {
return std::move(_async_message);
}
};
@@ -129,7 +126,7 @@ public:
};
FileStorHandler() : _getNextMessageTimout(100ms) { }
- virtual ~FileStorHandler() = default;
+ ~FileStorHandler() override = default;
/**
@@ -171,7 +168,9 @@ public:
*
* @param stripe The stripe to get messages for
*/
- virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;
+ [[nodiscard]] virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;
+
+ [[nodiscard]] virtual LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) = 0;
/** Only used for testing, should be removed */
LockedMessage getNextMessage(uint32_t stripeId) {
@@ -189,8 +188,6 @@ public:
* NB: As current operation can be a split or join operation, make sure that
* you always wait for current to finish, if is a super or sub bucket of
* the bucket we're locking.
- *
- *
*/
virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0;
@@ -287,6 +284,8 @@ public:
virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0;
virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
+
+ virtual void set_max_feed_op_batch_size(uint32_t max_batch) noexcept = 0;
private:
vespalib::duration _getNextMessageTimout;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 1984d44652a..2e2634025a7 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -12,6 +12,7 @@
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/stllike/hash_set.hpp>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/string_escape.h>
@@ -60,7 +61,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false),
_throttle_apply_bucket_diff_ops(false),
- _last_active_operations_stats()
+ _last_active_operations_stats(),
+ _max_feed_op_batch_size(1)
{
assert(numStripes > 0);
_stripes.reserve(numStripes);
@@ -241,8 +243,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg)
{
if (getState() == FileStorHandler::AVAILABLE) {
document::Bucket bucket = getStorageMessageBucket(*msg);
- stripe(bucket).schedule(MessageEntry(msg, bucket));
- return true;
+ return stripe(bucket).schedule(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime()));
}
return false;
}
@@ -252,7 +253,7 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a
{
if (getState() == FileStorHandler::AVAILABLE) {
document::Bucket bucket = getStorageMessageBucket(*msg);
- return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket)));
+ return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime())));
}
return {};
}
@@ -403,10 +404,25 @@ FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time dea
if (!tryHandlePause()) {
return {}; // Still paused, return to allow tick.
}
-
return _stripes[stripeId].getNextMessage(deadline);
}
+FileStorHandler::LockedMessageBatch
+FileStorHandlerImpl::next_message_batch(uint32_t stripe_id, vespalib::steady_time now, vespalib::steady_time deadline)
+{
+ if (!tryHandlePause()) {
+ return {};
+ }
+ return _stripes[stripe_id].next_message_batch(now, deadline);
+}
+
+FileStorHandler::LockedMessage
+FileStorHandlerImpl::LockedMessageBatch::release_as_single_msg() noexcept
+{
+ assert(lock && messages.size() == 1);
+ return {std::move(lock), std::move(messages[0].first), std::move(messages[0].second)};
+}
+
std::shared_ptr<FileStorHandler::BucketLockInterface>
FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) {
std::unique_lock guard(*_lock);
@@ -858,9 +874,10 @@ FileStorHandlerImpl::sendReplyDirectly(const std::shared_ptr<api::StorageReply>&
}
FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd,
- const document::Bucket &bucket)
+ const document::Bucket& bucket,
+ vespalib::steady_time scheduled_at_time)
: _command(cmd),
- _timer(),
+ _timer(scheduled_at_time),
_bucket(bucket),
_priority(cmd->getPriority())
{ }
@@ -939,9 +956,8 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
+FileStorHandlerImpl::Stripe::next_message_impl(monitor_guard& guard, vespalib::steady_time deadline)
{
- std::unique_lock guard(*_lock);
ThrottleToken throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
@@ -993,6 +1009,92 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
}
FileStorHandler::LockedMessage
+FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
+{
+ std::unique_lock guard(*_lock);
+ return next_message_impl(guard, deadline);
+}
+
+namespace {
+
+constexpr bool is_batchable_feed_op(api::MessageType::Id id) noexcept {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID);
+}
+
+// Precondition: msg must be a feed operation request (put, remove, update)
+document::GlobalId gid_from_feed_op(const api::StorageMessage& msg) {
+ switch (msg.getType().getId()) {
+ case api::MessageType::PUT_ID:
+ return dynamic_cast<const api::PutCommand&>(msg).getDocumentId().getGlobalId();
+ case api::MessageType::REMOVE_ID:
+ return dynamic_cast<const api::RemoveCommand&>(msg).getDocumentId().getGlobalId();
+ case api::MessageType::UPDATE_ID:
+ return dynamic_cast<const api::UpdateCommand&>(msg).getDocumentId().getGlobalId();
+ default: abort();
+ }
+}
+
+} // anon ns
+
+FileStorHandler::LockedMessageBatch
+FileStorHandlerImpl::Stripe::next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline)
+{
+ const auto max_batch_size = _owner.max_feed_op_batch_size();
+
+ std::unique_lock guard(*_lock);
+ auto initial_locked = next_message_impl(guard, deadline);
+ if (!initial_locked.lock || !is_batchable_feed_op(initial_locked.msg->getType().getId()) || (max_batch_size == 1)) {
+ return LockedMessageBatch(std::move(initial_locked));
+ }
+ LockedMessageBatch batch(std::move(initial_locked));
+ fill_feed_op_batch(guard, batch, max_batch_size, now);
+ return batch;
+}
+
+void
+FileStorHandlerImpl::Stripe::fill_feed_op_batch(monitor_guard& guard, LockedMessageBatch& batch,
+ uint32_t max_batch_size, vespalib::steady_time now)
+{
+ assert(batch.size() == 1);
+ BucketIdx& idx = bmi::get<2>(*_queue);
+ auto bucket_msgs = idx.equal_range(batch.lock->getBucket());
+ // Process in FIFO order (_not_ priority order) until we hit the end, a non-batchable operation
+ // (implicit pipeline stall since bucket set might change) or can't get another throttle token.
+ // We also stall the pipeline if we get a concurrent modification to the same document (not expected,
+ // as the distributors should prevent this, but _technically_ it is possible).
+ const auto expected_max_size = std::min(ssize_t(max_batch_size), std::distance(bucket_msgs.first, bucket_msgs.second) + 1);
+ vespalib::hash_set<document::GlobalId, document::GlobalId::hash> gids_in_batch(expected_max_size);
+ gids_in_batch.insert(gid_from_feed_op(*batch.messages[0].first));
+ for (auto it = bucket_msgs.first; (it != bucket_msgs.second) && (batch.messages.size() < max_batch_size);) {
+ if (!is_batchable_feed_op(it->_command->getType().getId())) {
+ break;
+ }
+ auto [existing_iter, inserted] = gids_in_batch.insert(gid_from_feed_op(*it->_command));
+ if (!inserted) {
+ break; // Already present in batch
+ }
+ if (messageTimedOutInQueue(*it->_command, now - it->_timer.start_time())) {
+ // We just ignore timed out ops here; actually generating a timeout reply will be done by
+ // next_message_impl() during a subsequent invocation. This avoids having to deal with any
+ // potential issues caused by sending a reply up while holding the queue lock, since we
+ // can't release it here.
+ ++it;
+ continue;
+ }
+ auto throttle_token = _owner.operation_throttler().try_acquire_one();
+ if (!throttle_token.valid()) {
+ break;
+ }
+ // Note: iterator is const; can't std::move(it->_command)
+ batch.messages.emplace_back(it->_command, std::move(throttle_token));
+ it = idx.erase(it);
+ }
+ update_cached_queue_size(guard);
+}
+
+FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
{
if (_owner.isPaused()) {
@@ -1021,9 +1123,11 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
ThrottleToken throttle_token)
{
- std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
+ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(
+ _owner._component.getClock().getMonotonicTime(),
+ _metrics->averageQueueWaitingTime)));
- std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
+ std::shared_ptr<api::StorageMessage> msg = iter->_command; // iter is const; can't std::move()
document::Bucket bucket(iter->_bucket);
idx.erase(iter); // iter not used after this point.
update_cached_queue_size(guard);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index f7c9b218779..ac8e5c52cbf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -55,7 +55,9 @@ public:
document::Bucket _bucket;
uint8_t _priority;
- MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::Bucket &bId);
+ MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd,
+ const document::Bucket& bucket,
+ vespalib::steady_time scheduled_at_time);
MessageEntry(MessageEntry &&) noexcept ;
MessageEntry(const MessageEntry &) noexcept;
MessageEntry & operator = (const MessageEntry &) = delete;
@@ -66,13 +68,13 @@ public:
}
};
- using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry> >;
- using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>;
-
+ // ordered_non_unique shall preserve insertion order as iteration order of equal keys, but this is rather magical...
+ using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry>>;
+ using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>;
using PriorityQueue = bmi::multi_index_container<MessageEntry, bmi::indexed_by<bmi::sequenced<>, PriorityOrder, BucketOrder>>;
+ using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
+ using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
- using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
- using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
using Clock = std::chrono::steady_clock;
using monitor_guard = std::unique_lock<std::mutex>;
using atomic_size_t = vespalib::datastore::AtomicValueWrapper<size_t>;
@@ -114,7 +116,7 @@ public:
ActiveOperationsStats &stats() { return _stats; }
};
SafeActiveOperationsStats() : _lock(std::make_unique<std::mutex>()), _stats() {}
- Guard guard() { return Guard(*_lock, _stats, ctor_tag()); }
+ [[nodiscard]] Guard guard() { return Guard(*_lock, _stats, ctor_tag()); }
};
Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
@@ -123,8 +125,8 @@ public:
Stripe & operator =(const Stripe &) = delete;
~Stripe();
void flush();
- bool schedule(MessageEntry messageEntry);
- FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry);
+ [[nodiscard]] bool schedule(MessageEntry messageEntry);
+ [[nodiscard]] FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry);
void waitUntilNoLocks() const;
void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd);
void waitInactive(const AbortBucketOperationsCommand& cmd) const;
@@ -151,18 +153,19 @@ public:
api::LockingRequirements lockReq, bool count_as_active_merge,
const LockEntry & lockEntry);
- std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
+ [[nodiscard]] std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
- FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
+ [[nodiscard]] FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
+ [[nodiscard]] FileStorHandler::LockedMessageBatch next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
- std::mutex & exposeLock() { return *_lock; }
- PriorityQueue & exposeQueue() { return *_queue; }
- BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); }
+ [[nodiscard]] std::mutex & exposeLock() { return *_lock; }
+ [[nodiscard]] PriorityQueue & exposeQueue() { return *_queue; }
+ [[nodiscard]] BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); }
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
- ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
+ [[nodiscard]] ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
private:
void update_cached_queue_size(const std::lock_guard<std::mutex> &) {
_cached_queue_size.store_relaxed(_queue->size());
@@ -170,15 +173,20 @@ public:
void update_cached_queue_size(const std::unique_lock<std::mutex> &) {
_cached_queue_size.store_relaxed(_queue->size());
}
- bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
- FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
+ [[nodiscard]] bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ [[nodiscard]] FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
[[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept;
+ [[nodiscard]] FileStorHandler::LockedMessage next_message_impl(monitor_guard& held_lock,
+ vespalib::steady_time deadline);
+ void fill_feed_op_batch(monitor_guard& held_lock, LockedMessageBatch& batch,
+ uint32_t max_batch_size, vespalib::steady_time now);
+
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
- FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
- PriorityIdx::iterator iter,
- ThrottleToken throttle_token);
+ [[nodiscard]] FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
+ PriorityIdx::iterator iter,
+ ThrottleToken throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -233,7 +241,8 @@ public:
bool schedule(const std::shared_ptr<api::StorageMessage>&) override;
ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override;
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;
+ LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;
+ LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) override;
void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override;
void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override;
@@ -292,6 +301,13 @@ public:
_throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed);
}
+ void set_max_feed_op_batch_size(uint32_t max_batch) noexcept override {
+ _max_feed_op_batch_size.store(max_batch, std::memory_order_relaxed);
+ }
+ [[nodiscard]] uint32_t max_feed_op_batch_size() const noexcept {
+ return _max_feed_op_batch_size.load(std::memory_order_relaxed);
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -316,6 +332,7 @@ private:
std::atomic<bool> _paused;
std::atomic<bool> _throttle_apply_bucket_diff_ops;
std::optional<ActiveOperationsStats> _last_active_operations_stats;
+ std::atomic<uint32_t> _max_feed_op_batch_size;
// Returns the index in the targets array we are sending to, or -1 if none of them match.
int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 0046bd96b65..23de39f7130 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -239,6 +239,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size());
_filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
+ _filestorHandler->set_max_feed_op_batch_size(std::max(1, config.maxFeedOpBatchSize));
// TODO remove once desired throttling behavior is set in stone
{
_filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 0612798b43a..4761937075d 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -134,7 +134,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
_env._metrics.operations.inc();
if (msg.getType().isReply()) {
- try{
+ try {
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
@@ -161,9 +161,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
void
PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get());
api::StorageMessage & msg(*lock.msg);
-
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler,
@@ -174,4 +172,24 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
+void
+PersistenceHandler::process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock,
+ std::span<BatchedMessage> bucket_messages)
+{
+ const auto bucket = lock->getBucket();
+ auto batch = std::make_shared<AsyncMessageBatch>(std::move(lock), _env, _env._fileStorHandler);
+ for (auto& bm : bucket_messages) {
+ assert(bm.first->getBucket() == bucket);
+ // Important: we _copy_ the message shared_ptr instead of moving to ensure that `*bm.first` remains
+ // valid even if the tracker is destroyed by an exception in processMessage(). All std::exceptions
+ // are caught there, so we do not expect our loop to be interrupted.
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env,
+ batch, bm.first, std::move(bm.second));
+ tracker = processMessage(*bm.first, std::move(tracker));
+ if (tracker) {
+ tracker->sendReply(); // Actually defers to batch reply queue
+ }
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index 0da518a1cfa..224bb70a16b 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -2,14 +2,16 @@
#pragma once
-#include "processallhandler.h"
-#include "mergehandler.h"
#include "asynchandler.h"
+#include "batched_message.h"
+#include "mergehandler.h"
#include "persistenceutil.h"
-#include "splitjoinhandler.h"
+#include "processallhandler.h"
#include "simplemessagehandler.h"
+#include "splitjoinhandler.h"
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <span>
namespace storage {
@@ -29,6 +31,8 @@ public:
~PersistenceHandler();
void processLockedMessage(FileStorHandler::LockedMessage lock) const;
+ void process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock,
+ std::span<BatchedMessage> bucket_messages);
//TODO Rewrite tests to avoid this api leak
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index a98418281d2..5710c3fe26f 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -14,6 +14,7 @@ PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, Fi
uint32_t stripeId, framework::Component & component)
: _persistenceHandler(persistenceHandler),
_fileStorHandler(fileStorHandler),
+ _clock(component.getClock()),
_stripeId(stripeId),
_thread()
{
@@ -36,14 +37,19 @@ PersistenceThread::run(framework::ThreadHandle& thread)
vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms);
while (!thread.interrupted()) {
- vespalib::steady_time now = vespalib::steady_clock::now();
+ vespalib::steady_time now = _clock.getMonotonicTime();
thread.registerTick(framework::UNKNOWN_CYCLE, now);
vespalib::steady_time deadline = now + max_wait_time;
- FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline));
-
- if (lock.lock) {
- _persistenceHandler.processLockedMessage(std::move(lock));
+ auto batch = _fileStorHandler.next_message_batch(_stripeId, now, deadline);
+ if (!batch.empty()) {
+ // Special-case single message batches, as actually scheduling a full batch has more
+ // overhead due to extra bookkeeping state and deferred reply-sending.
+ if (batch.size() == 1) {
+ _persistenceHandler.processLockedMessage(batch.release_as_single_msg());
+ } else {
+ _persistenceHandler.process_locked_message_batch(std::move(batch.lock), batch.messages);
+ }
}
}
LOG(debug, "Closing down persistence thread");
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index aacd1dd4830..2e9852ada73 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -8,6 +8,7 @@
namespace storage {
namespace framework {
+ struct Clock;
class Component;
class Thread;
}
@@ -27,10 +28,11 @@ public:
framework::Thread& getThread() override { return *_thread; }
private:
- PersistenceHandler & _persistenceHandler;
- FileStorHandler & _fileStorHandler;
- uint32_t _stripeId;
- std::unique_ptr<framework::Thread> _thread;
+ PersistenceHandler& _persistenceHandler;
+ FileStorHandler& _fileStorHandler;
+ const framework::Clock& _clock;
+ uint32_t _stripeId;
+ std::unique_ptr<framework::Thread> _thread;
void run(framework::ThreadHandle&) override;
};
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index c975721c855..54bce72d7ff 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -11,41 +11,82 @@ LOG_SETUP(".persistence.util");
namespace storage {
namespace {
- bool isBatchable(api::MessageType::Id id)
- {
- return (id == api::MessageType::PUT_ID ||
- id == api::MessageType::REMOVE_ID ||
- id == api::MessageType::UPDATE_ID);
- }
- bool hasBucketInfo(api::MessageType::Id id)
- {
- return (isBatchable(id) ||
- (id == api::MessageType::REMOVELOCATION_ID ||
- id == api::MessageType::JOINBUCKETS_ID));
+constexpr bool is_batchable(api::MessageType::Id id) noexcept {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID);
+}
+
+constexpr bool has_bucket_info(api::MessageType::Id id) noexcept {
+ return (is_batchable(id) ||
+ (id == api::MessageType::REMOVELOCATION_ID ||
+ id == api::MessageType::JOINBUCKETS_ID));
+}
+
+constexpr vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
+
+} // anon ns
+
+DeferredReplySenderStub::DeferredReplySenderStub() = default;
+DeferredReplySenderStub::~DeferredReplySenderStub() = default;
+
+AsyncMessageBatch::AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ const PersistenceUtil& env,
+ MessageSender& reply_sender) noexcept
+ : _bucket_lock(std::move(bucket_lock)),
+ _env(env),
+ _reply_sender(reply_sender),
+ _deferred_sender_stub()
+{
+ assert(_bucket_lock);
+}
+
+AsyncMessageBatch::~AsyncMessageBatch() {
+ const auto bucket_info = _env.getBucketInfo(_bucket_lock->getBucket());
+ _env.updateBucketDatabase(_bucket_lock->getBucket(), bucket_info);
+
+ std::lock_guard lock(_deferred_sender_stub._mutex); // Ensure visibility of posted replies
+ for (auto& reply : _deferred_sender_stub._deferred_replies) {
+ if (reply->getResult().success()) {
+ dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(bucket_info);
+ }
+ _reply_sender.sendReplyDirectly(reply);
}
- const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
+ LOG(debug, "Processed async feed message batch of %zu ops for %s. New bucket info is %s",
+ _deferred_sender_stub._deferred_replies.size(),
+ _bucket_lock->getBucket().toString().c_str(), bucket_info.toString().c_str());
}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token)
- : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), {}, std::move(msg), std::move(throttle_token))
+{}
+
+MessageTracker::MessageTracker(const framework::MilliSecTimer& timer,
+ const PersistenceUtil& env,
+ std::shared_ptr<AsyncMessageBatch> batch,
+ std::shared_ptr<api::StorageMessage> msg,
+ ThrottleToken throttle_token)
+ : MessageTracker(timer, env, batch->deferred_sender_stub(), false, {}, std::move(batch), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
- bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg,
+ bool update_bucket_info,
+ std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ std::shared_ptr<AsyncMessageBatch> part_of_batch,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token)
: _sendReply(true),
- _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
- _bucketLock(std::move(bucketLock)),
+ _updateBucketInfo(update_bucket_info && has_bucket_info(msg->getType().getId())),
+ _bucketLock(std::move(bucket_lock)),
+ _part_of_batch(std::move(part_of_batch)),
_msg(std::move(msg)),
_throttle_token(std::move(throttle_token)),
_context(_msg->getPriority(), _msg->getTrace().getLevel()),
@@ -61,7 +102,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
- std::move(msg), ThrottleToken()));
+ {}, std::move(msg), ThrottleToken()));
}
void
@@ -102,6 +143,7 @@ MessageTracker::sendReply() {
if (hasReply()) {
getReply().getTrace().addChild(_context.steal_trace());
if (_updateBucketInfo) {
+ assert(_bucketLock);
if (getReply().getResult().success()) {
_env.setBucketInfo(*this, _bucketLock->getBucket());
}
@@ -163,7 +205,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier>
MessageTracker::sync_phase_done_notifier_or_nullptr() const
{
- if (_bucketLock->wants_sync_phase_done_notification()) {
+ if (_bucketLock && _bucketLock->wants_sync_phase_done_notification()) {
return _bucketLock;
}
return {};
@@ -236,7 +278,7 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &
{
api::BucketInfo info = getBucketInfo(bucket);
- static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info);
+ dynamic_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info);
updateBucketDatabase(bucket, info);
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 900f301252e..71e5c2754e9 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -25,12 +25,50 @@ namespace storage {
class PersistenceUtil;
+struct DeferredReplySenderStub : MessageSender {
+ std::mutex _mutex;
+ std::vector<std::shared_ptr<api::StorageReply>> _deferred_replies;
+
+ DeferredReplySenderStub();
+ ~DeferredReplySenderStub() override;
+
+ void sendCommand(const std::shared_ptr<api::StorageCommand>&) override {
+ abort(); // Not supported
+ }
+ void sendReply(const std::shared_ptr<api::StorageReply>& reply) override {
+ std::lock_guard lock(_mutex);
+ _deferred_replies.emplace_back(reply);
+ }
+};
+
+class AsyncMessageBatch {
+ std::shared_ptr<FileStorHandler::BucketLockInterface> _bucket_lock;
+ const PersistenceUtil& _env;
+ MessageSender& _reply_sender;
+ DeferredReplySenderStub _deferred_sender_stub;
+public:
+ AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ const PersistenceUtil& env,
+ MessageSender& reply_sender) noexcept;
+ // Triggered by last referencing batched MessageTracker being destroyed.
+ // Fetches bucket info, updates DB and sends all deferred replies with the new bucket info.
+ ~AsyncMessageBatch();
+
+ [[nodiscard]] MessageSender& deferred_sender_stub() noexcept { return _deferred_sender_stub; }
+};
+
class MessageTracker {
public:
using UP = std::unique_ptr<MessageTracker>;
- MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & reply_sender,
+ FileStorHandler::BucketLockInterface::SP bucket_lock, std::shared_ptr<api::StorageMessage> msg,
+ ThrottleToken throttle_token);
+
+ // For use with batching where bucket lock is held separately and bucket info
+ // is _not_ fetched or updated per message.
+ MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env,
+ std::shared_ptr<AsyncMessageBatch> batch, std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token);
~MessageTracker();
@@ -58,48 +96,53 @@ public:
* commands like merge. */
void dontReply() { _sendReply = false; }
- bool hasReply() const { return bool(_reply); }
- const api::StorageReply & getReply() const {
+ [[nodiscard]] bool hasReply() const { return bool(_reply); }
+ [[nodiscard]] const api::StorageReply & getReply() const {
return *_reply;
}
- api::StorageReply & getReply() {
+ [[nodiscard]] api::StorageReply & getReply() {
return *_reply;
}
- std::shared_ptr<api::StorageReply> && stealReplySP() && {
+ [[nodiscard]] std::shared_ptr<api::StorageReply> && stealReplySP() && {
return std::move(_reply);
}
void generateReply(api::StorageCommand& cmd);
- api::ReturnCode getResult() const { return _result; }
+ [[nodiscard]] api::ReturnCode getResult() const { return _result; }
- spi::Context & context() { return _context; }
- document::BucketId getBucketId() const {
+ [[nodiscard]] spi::Context & context() { return _context; }
+ [[nodiscard]] document::BucketId getBucketId() const {
return _bucketLock->getBucket().getBucketId();
}
void sendReply();
- bool checkForError(const spi::Result& response);
+ [[nodiscard]] bool checkForError(const spi::Result& response);
// Returns a non-nullptr notifier instance iff the underlying operation wants to be notified
- // when the sync phase is complete. Otherwise returns a nullptr shared_ptr.
- std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
+ // when the sync phase is complete. Otherwise, returns a nullptr shared_ptr.
+ [[nodiscard]] std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
static MessageTracker::UP
createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
private:
- MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env,
+ MessageSender& reply_sender, bool update_bucket_info,
+ std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ std::shared_ptr<AsyncMessageBatch> part_of_batch,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
bool _sendReply;
bool _updateBucketInfo;
+ // Either _bucketLock or _part_of_batch must be set, never both at the same time
FileStorHandler::BucketLockInterface::SP _bucketLock;
+ std::shared_ptr<AsyncMessageBatch> _part_of_batch; // nullptr if not batched
std::shared_ptr<api::StorageMessage> _msg;
ThrottleToken _throttle_token;
spi::Context _context;
@@ -117,8 +160,6 @@ public:
struct LockResult {
std::shared_ptr<FileStorHandler::BucketLockInterface> lock;
LockResult() : lock() {}
-
- bool bucketExisted() const { return bool(lock); }
};
PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler,
diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
index 277e7b4fdfd..93b305370e0 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
@@ -13,7 +13,7 @@
namespace storage::framework::defaultimplementation {
-struct RealClock : public Clock {
+struct RealClock final : public Clock {
vespalib::steady_time getMonotonicTime() const override;
vespalib::system_time getSystemTime() const override;
};