summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-04-10 13:21:54 +0200
committerGitHub <noreply@github.com>2024-04-10 13:21:54 +0200
commit29b9803e6932ab9be36e97463219b7d09494857f (patch)
treec073c4649693a088d19d27654db5a55ae74a65f2
parentde9a1561fa9dcc9126baa161c5ec40bd6ccbde95 (diff)
parent496309291cb7171bf25b0f73b1465e64c978fdfc (diff)
Merge pull request #30860 from vespa-engine/vekterli/persistence-locking-feed-op-batching
Support pipelining (batching) of mutating ops to same bucket
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def9
-rw-r--r--metrics/src/vespa/metrics/metrictimer.cpp9
-rw-r--r--metrics/src/vespa/metrics/metrictimer.h24
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp20
-rw-r--r--storage/src/tests/persistence/filestorage/CMakeLists.txt2
-rw-r--r--storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp318
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp35
-rw-r--r--storage/src/tests/persistence/filestorage/gtest_runner.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/batched_message.h14
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h65
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp128
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h59
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h10
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h10
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp87
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h75
-rw-r--r--storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h2
22 files changed, 800 insertions, 136 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index de67d4336e9..a5d86cc91ba 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -29,7 +29,7 @@ response_sequencer_type enum {LATENCY, THROUGHPUT, ADAPTIVE} default=ADAPTIVE re
## Should follow stor-distributormanager:splitsize (16MB).
bucket_merge_chunk_size int default=16772216 restart
-## Whether or not to use async message handling when scheduling storage messages from FileStorManager.
+## Whether to use async message handling when scheduling storage messages from FileStorManager.
##
## When turned on, the calling thread (e.g. FNET network thread when using Storage API RPC)
## gets the next async message to handle (if any) as part of scheduling a storage message.
@@ -61,3 +61,10 @@ async_operation_throttler.window_size_backoff double default=0.95
async_operation_throttler.min_window_size int default=20
async_operation_throttler.max_window_size int default=-1 # < 0 implies INT_MAX
async_operation_throttler.resize_rate double default=3.0
+
+## Maximum number of enqueued put/remove/update operations towards a given bucket
+## that can be dispatched asynchronously as a batch under the same write lock.
+## This prevents pipeline stalls when many write operations are in-flight to the
+## same bucket, as each operation would otherwise have to wait for the completion
+## of all prior writes to the bucket.
+max_feed_op_batch_size int default=1
diff --git a/metrics/src/vespa/metrics/metrictimer.cpp b/metrics/src/vespa/metrics/metrictimer.cpp
index 84d4844104d..a3b0f215d58 100644
--- a/metrics/src/vespa/metrics/metrictimer.cpp
+++ b/metrics/src/vespa/metrics/metrictimer.cpp
@@ -3,13 +3,18 @@
namespace metrics {
-MetricTimer::MetricTimer()
+MetricTimer::MetricTimer() noexcept
+ : _startTime(std::chrono::steady_clock::now())
{
// Amusingly enough, steady_clock was not actually steady by default on
// GCC < 4.8.1, so add a bit of compile-time paranoia just to make sure.
static_assert(std::chrono::steady_clock::is_steady,
"Old/broken STL implementation; steady_clock not steady");
- _startTime = std::chrono::steady_clock::now();
+}
+
+MetricTimer::MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept
+ : _startTime(start_time)
+{
}
} // metrics
diff --git a/metrics/src/vespa/metrics/metrictimer.h b/metrics/src/vespa/metrics/metrictimer.h
index 8a338432362..133cd819489 100644
--- a/metrics/src/vespa/metrics/metrictimer.h
+++ b/metrics/src/vespa/metrics/metrictimer.h
@@ -15,7 +15,19 @@ namespace metrics {
class MetricTimer {
public:
- MetricTimer();
+ // Start time point set by system steady clock
+ MetricTimer() noexcept;
+ // Start time point explicitly given
+ explicit MetricTimer(std::chrono::steady_clock::time_point start_time) noexcept;
+
+ template<typename AvgVal, typename TotVal, bool SumOnAdd>
+ AvgVal stop(std::chrono::steady_clock::time_point now, ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
+ const auto delta = now - _startTime;
+ using ToDuration = std::chrono::duration<AvgVal, std::milli>;
+ const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
+ metric.addValue(deltaMs);
+ return deltaMs;
+ }
/**
* Adds ms passed since this timer was constructed to given value metric.
@@ -26,11 +38,11 @@ public:
*/
template<typename AvgVal, typename TotVal, bool SumOnAdd>
AvgVal stop(ValueMetric<AvgVal, TotVal, SumOnAdd>& metric) const {
- const auto delta = std::chrono::steady_clock::now() - _startTime;
- using ToDuration = std::chrono::duration<AvgVal, std::milli>;
- const auto deltaMs(std::chrono::duration_cast<ToDuration>(delta).count());
- metric.addValue(deltaMs);
- return deltaMs;
+ return stop(std::chrono::steady_clock::now(), metric);
+ }
+
+ [[nodiscard]] std::chrono::steady_clock::time_point start_time() const noexcept {
+ return _startTime;
}
private:
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
index bf91a84235a..71be34e3f54 100644
--- a/storage/src/tests/persistence/active_operations_stats_test.cpp
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -28,6 +28,8 @@ public:
std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
+ void SetUp() override;
+ void TearDown() override;
void assert_active_operations_stats(const ActiveOperationsStats &stats, uint32_t exp_active_size, uint32_t exp_size_samples, uint32_t exp_latency_samples);
void update_metrics();
void test_active_operations_stats();
@@ -41,16 +43,30 @@ ActiveOperationsStatsTest::ActiveOperationsStatsTest()
metrics(),
stripeId(0)
{
+ // Initialization of members must happen in SetUp() since this test transitively
+ // depends on components modified by the superclass' SetUp() method.
+}
+
+void
+ActiveOperationsStatsTest::SetUp()
+{
+ FileStorTestFixture::SetUp();
setupPersistenceThreads(1);
_node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo()));
top.push_back(std::move(dummyManager));
top.open();
metrics.initDiskMetrics(1, 1);
- filestorHandler = std::make_unique<FileStorHandlerImpl>(messageSender, metrics,
- _node->getComponentRegister());
+ filestorHandler = std::make_unique<FileStorHandlerImpl>(messageSender, metrics, _node->getComponentRegister());
filestorHandler->setGetNextMessageTimeout(20ms);
}
+void
+ActiveOperationsStatsTest::TearDown()
+{
+ filestorHandler.reset();
+ FileStorTestFixture::TearDown();
+}
+
ActiveOperationsStatsTest::~ActiveOperationsStatsTest() = default;
std::shared_ptr<api::StorageMessage>
diff --git a/storage/src/tests/persistence/filestorage/CMakeLists.txt b/storage/src/tests/persistence/filestorage/CMakeLists.txt
index f1a8a286bbd..aa7c9fe995c 100644
--- a/storage/src/tests/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/tests/persistence/filestorage/CMakeLists.txt
@@ -4,6 +4,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST
SOURCES
deactivatebucketstest.cpp
deletebuckettest.cpp
+ feed_operation_batching_test.cpp
filestormanagertest.cpp
filestormodifiedbucketstest.cpp
mergeblockingtest.cpp
@@ -18,6 +19,7 @@ vespa_add_executable(storage_filestorage_gtest_runner_app TEST
storage_testhostreporter
storage_testpersistence_common
GTest::GTest
+ absl::failure_signal_handler
)
vespa_add_test( NAME storage_filestorage_gtest_runner_app COMMAND storage_filestorage_gtest_runner_app COST 50)
diff --git a/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp
new file mode 100644
index 00000000000..cf16123933b
--- /dev/null
+++ b/storage/src/tests/persistence/filestorage/feed_operation_batching_test.cpp
@@ -0,0 +1,318 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/persistence/common/filestortestfixture.h>
+#include <tests/persistence/filestorage/forwardingmessagesender.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/persistence/filestorage/filestormetrics.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <gtest/gtest.h>
+
+using document::test::makeDocumentBucket;
+using document::BucketId;
+using document::DocumentId;
+using namespace ::testing;
+
+namespace storage {
+
+struct FeedOperationBatchingTest : FileStorTestFixture {
+ DummyStorageLink _top;
+ std::unique_ptr<ForwardingMessageSender> _message_sender;
+ FileStorMetrics _metrics;
+ std::unique_ptr<FileStorHandler> _handler;
+ api::Timestamp _next_timestamp;
+
+ FeedOperationBatchingTest();
+ ~FeedOperationBatchingTest() override;
+
+ void SetUp() override {
+ FileStorTestFixture::SetUp();
+ // This silly little indirection is a work-around for the top-level link needing something
+ // below it to send _up_ into it, rather than directly receiving the messages itself.
+ auto message_receiver = std::make_unique<DummyStorageLink>();
+ _message_sender = std::make_unique<ForwardingMessageSender>(*message_receiver);
+ _top.push_back(std::move(message_receiver));
+ _top.open();
+ _metrics.initDiskMetrics(1, 1);
+ // By default, sets up 1 thread with 1 stripe
+ _handler = std::make_unique<FileStorHandlerImpl>(*_message_sender, _metrics, _node->getComponentRegister());
+ _handler->set_max_feed_op_batch_size(3);
+ }
+
+ void TearDown() override {
+ _handler.reset();
+ FileStorTestFixture::TearDown();
+ }
+
+ [[nodiscard]] static vespalib::string id_str_of(uint32_t bucket_idx, uint32_t doc_idx) {
+ return vespalib::make_string("id:foo:testdoctype1:n=%u:%u", bucket_idx, doc_idx);
+ }
+
+ [[nodiscard]] static DocumentId id_of(uint32_t bucket_idx, uint32_t doc_idx) {
+ return DocumentId(id_str_of(bucket_idx, doc_idx));
+ }
+
+ void schedule_msg(const std::shared_ptr<api::StorageMessage>& msg) {
+ msg->setAddress(makeSelfAddress());
+ _handler->schedule(msg); // takes shared_ptr by const ref, no point in moving
+ }
+
+ void send_put(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp, vespalib::duration timeout) {
+ auto id = id_str_of(bucket_idx, doc_idx);
+ auto doc = _node->getTestDocMan().createDocument("foobar", id);
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket({16, bucket_idx}), std::move(doc), timestamp);
+ cmd->setTimeout(timeout);
+ schedule_msg(cmd);
+ }
+
+ void send_put(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_put(bucket_idx, doc_idx, next_timestamp(), 60s);
+ }
+
+ void send_puts(std::initializer_list<std::pair<uint32_t, uint32_t>> bucket_docs) {
+ for (const auto& bd : bucket_docs) {
+ send_put(bd.first, bd.second);
+ }
+ }
+
+ void send_get(uint32_t bucket_idx, uint32_t doc_idx) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket({16, bucket_idx}), id, document::AllFields::NAME);
+ schedule_msg(cmd);
+ }
+
+ void send_remove(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto cmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket({16, bucket_idx}), id, timestamp);
+ schedule_msg(cmd);
+ }
+
+ void send_remove(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_remove(bucket_idx, doc_idx, next_timestamp());
+ }
+
+ void send_update(uint32_t bucket_idx, uint32_t doc_idx, uint32_t timestamp) {
+ auto id = id_of(bucket_idx, doc_idx);
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _node->getTestDocMan().getTypeRepo(),
+ _node->getTestDocMan().createRandomDocument()->getType(), id);
+ auto cmd = std::make_shared<api::UpdateCommand>(makeDocumentBucket({16, bucket_idx}), std::move(update), timestamp);
+ schedule_msg(cmd);
+ }
+
+ void send_update(uint32_t bucket_idx, uint32_t doc_idx) {
+ send_update(bucket_idx, doc_idx, next_timestamp());
+ }
+
+ [[nodiscard]] api::Timestamp next_timestamp() {
+ auto ret = _next_timestamp;
+ ++_next_timestamp;
+ return ret;
+ }
+
+ [[nodiscard]] vespalib::steady_time fake_now() const {
+ return _node->getClock().getMonotonicTime();
+ }
+
+ [[nodiscard]] vespalib::steady_time fake_deadline() const {
+ return _node->getClock().getMonotonicTime() + 60s;
+ }
+
+ [[nodiscard]] FileStorHandler::LockedMessageBatch next_batch() {
+ return _handler->next_message_batch(0, fake_now(), fake_deadline());
+ }
+
+ template <typename CmdType>
+ static void assert_batch_msg_is(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ ASSERT_LT(msg_idx, batch.size());
+ auto msg = batch.messages[msg_idx].first;
+ auto* as_cmd = dynamic_cast<const CmdType*>(msg.get());
+ ASSERT_TRUE(as_cmd) << msg->toString() << " does not have the expected type";
+ EXPECT_EQ(as_cmd->getBucketId(), BucketId(16, expected_bucket_idx));
+
+ auto id = as_cmd->getDocumentId();
+ ASSERT_TRUE(id.getScheme().hasNumber());
+ EXPECT_EQ(id.getScheme().getNumber(), expected_bucket_idx) << id;
+ std::string actual_id_part = id.getScheme().getNamespaceSpecific();
+ std::string expected_id_part = std::to_string(expected_doc_idx);
+ EXPECT_EQ(actual_id_part, expected_id_part) << id;
+ }
+
+ static void assert_batch_msg_is_put(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::PutCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_remove(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::RemoveCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_update(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::UpdateCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ static void assert_batch_msg_is_get(const FileStorHandler::LockedMessageBatch& batch, uint32_t msg_idx,
+ uint32_t expected_bucket_idx, uint32_t expected_doc_idx)
+ {
+ assert_batch_msg_is<api::GetCommand>(batch, msg_idx, expected_bucket_idx, expected_doc_idx);
+ }
+
+ enum Type {
+ Put,
+ Update,
+ Remove,
+ Get
+ };
+
+ static void assert_empty_batch(const FileStorHandler::LockedMessageBatch& batch) {
+ ASSERT_TRUE(batch.empty());
+ ASSERT_FALSE(batch.lock);
+ }
+
+ static void assert_batch(const FileStorHandler::LockedMessageBatch& batch,
+ uint32_t expected_bucket_idx,
+ std::initializer_list<std::pair<Type, uint32_t>> expected_msgs)
+ {
+ ASSERT_TRUE(batch.lock);
+ ASSERT_EQ(batch.lock->getBucket().getBucketId(), BucketId(16, expected_bucket_idx));
+ ASSERT_EQ(batch.size(), expected_msgs.size());
+
+ uint32_t idx = 0;
+ for (const auto& msg : expected_msgs) {
+ switch (msg.first) {
+ case Type::Put: assert_batch_msg_is_put(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Update: assert_batch_msg_is_update(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Remove: assert_batch_msg_is_remove(batch, idx, expected_bucket_idx, msg.second); break;
+ case Type::Get: assert_batch_msg_is_get(batch, idx, expected_bucket_idx, msg.second); break;
+ default: FAIL();
+ }
+ ++idx;
+ }
+ }
+};
+
+FeedOperationBatchingTest::FeedOperationBatchingTest()
+ : FileStorTestFixture(),
+ _top(),
+ _message_sender(),
+ _metrics(),
+ _handler(),
+ _next_timestamp(1000)
+{
+}
+
+FeedOperationBatchingTest::~FeedOperationBatchingTest() = default;
+
+// Note: unless explicitly set by the testcase, max batch size is 3
+
+TEST_F(FeedOperationBatchingTest, batching_is_disabled_with_1_max_batch_size) {
+ _handler->set_max_feed_op_batch_size(1);
+ send_puts({{1, 1}, {1, 2}, {2, 3}, {2, 4}});
+ // No batching; has the same behavior as current FIFO
+ assert_batch(next_batch(), 1, {{Put, 1}});
+ assert_batch(next_batch(), 1, {{Put, 2}});
+ assert_batch(next_batch(), 2, {{Put, 3}});
+ assert_batch(next_batch(), 2, {{Put, 4}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_is_limited_to_configured_max_size) {
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_batch(next_batch(), 1, {{Put, 4}, {Put, 5}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_can_consume_entire_queue) {
+ send_puts({{1, 1}, {1, 2}, {1, 3}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_empty_batch(next_batch());
+}
+
+TEST_F(FeedOperationBatchingTest, batching_is_only_done_for_single_bucket) {
+ send_puts({{1, 1}, {2, 2}, {2, 3}, {2, 4}, {3, 5}});
+ assert_batch(next_batch(), 1, {{Put, 1}});
+ assert_batch(next_batch(), 2, {{Put, 2}, {Put, 3}, {Put, 4}});
+ assert_batch(next_batch(), 3, {{Put, 5}});
+}
+
+TEST_F(FeedOperationBatchingTest, batch_can_include_all_supported_feed_op_types) {
+ send_put(1, 1);
+ send_remove(1, 2);
+ send_update(1, 3);
+ assert_batch(next_batch(), 1, {{Put, 1}, {Remove, 2}, {Update, 3}});
+}
+
+TEST_F(FeedOperationBatchingTest, timed_out_reqeusts_are_ignored_by_batch) {
+ send_puts({{1, 1}});
+ send_put(1, 2, next_timestamp(), 1s);
+ send_puts({{1, 3}});
+ _node->getClock().addSecondsToTime(2);
+ // Put #2 with 1s timeout has expired in the queue and should not be returned as part of the batch
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 3}});
+ ASSERT_EQ(_top.getNumReplies(), 0);
+ // The actual timeout is handled by the next message fetch invocation
+ assert_empty_batch(next_batch());
+ ASSERT_EQ(_top.getNumReplies(), 1);
+ EXPECT_EQ(dynamic_cast<api::StorageReply&>(*_top.getReply(0)).getResult().getResult(), api::ReturnCode::TIMEOUT);
+}
+
+TEST_F(FeedOperationBatchingTest, non_feed_ops_are_not_batched) {
+ send_get(1, 2);
+ send_get(1, 3);
+ assert_batch(next_batch(), 1, {{Get, 2}});
+ assert_batch(next_batch(), 1, {{Get, 3}});
+}
+
+TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_non_feed_op) {
+ // It can reasonably be argued that we could batch _around_ a Get operation and still
+ // have correct behavior, but the Get here is just a stand-in for an arbitrary operation such
+ // as a Split (which changes the bucket set), which is rather more tricky to reason about.
+ // For simplicity and understandability, just stall the batch pipeline (at least for now).
+ send_get(1, 2);
+ send_puts({{1, 3}, {1, 4}});
+ send_get(1, 5);
+ send_puts({{1, 6}, {1, 7}});
+
+ assert_batch(next_batch(), 1, {{Get, 2}}); // If first op is non-feed, only it should be returned
+ assert_batch(next_batch(), 1, {{Put, 3}, {Put, 4}});
+ assert_batch(next_batch(), 1, {{Get, 5}});
+ assert_batch(next_batch(), 1, {{Put, 6}, {Put, 7}});
+}
+
+TEST_F(FeedOperationBatchingTest, pipeline_stalled_by_concurrent_ops_to_same_document) {
+ // 2 ops to doc #2. Since this is expected to be a very rare edge case, just
+ // stop batching at that point and defer the concurrent op to the next batch.
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 2}, {1, 4}});
+ assert_batch(next_batch(), 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ assert_batch(next_batch(), 1, {{Put, 2}, {Put, 4}});
+}
+
+TEST_F(FeedOperationBatchingTest, batch_respects_persistence_throttling) {
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.min_window_size = 3;
+ params.max_window_size = 3;
+ params.window_size_increment = 1;
+ _handler->use_dynamic_operation_throttling(true);
+ _handler->reconfigure_dynamic_throttler(params);
+ _handler->set_max_feed_op_batch_size(10); // > win size to make sure we test the right thing
+
+ send_puts({{1, 1}, {1, 2}, {1, 3}, {1, 4}, {1, 5}});
+ auto batch = next_batch(); // holds 3 throttle tokens
+ assert_batch(batch, 1, {{Put, 1}, {Put, 2}, {Put, 3}});
+ // No more throttle tokens available
+ assert_empty_batch(next_batch());
+}
+
+} // storage
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 1ccd51d3f06..f12b85eb2ea 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -43,6 +43,7 @@ LOG_SETUP(".filestormanagertest");
using std::unique_ptr;
using document::Document;
+using document::BucketId;
using namespace storage::api;
using storage::spi::test::makeSpiBucket;
using document::test::makeDocumentBucket;
@@ -405,6 +406,38 @@ TEST_F(FileStorManagerTest, put) {
}
}
+TEST_F(FileStorManagerTest, feed_op_batch_updates_bucket_db_and_reply_bucket_info) {
+ PersistenceHandlerComponents c(*this);
+ c.filestorHandler->set_max_feed_op_batch_size(10);
+ BucketId bucket_id(16, 1);
+ createBucket(bucket_id);
+ constexpr uint32_t n = 10;
+ // No persistence thread started yet, so no chance of racing
+ for (uint32_t i = 0; i < n; ++i) {
+ auto put = make_put_command(120, vespalib::make_string("id:foo:testdoctype1:n=1:%u", i), Timestamp(1000) + i);
+ put->setAddress(_storage3);
+ c.filestorHandler->schedule(put);
+ }
+ auto pt = c.make_disk_thread();
+ c.filestorHandler->flush(true);
+ c.top.waitForMessages(n, _waitTime);
+ c.executor.sync_all(); // Ensure all async reply processing tasks must have completed.
+ api::BucketInfo expected_bucket_info;
+ {
+ StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket_id, "foo"));
+ ASSERT_TRUE(entry.exists());
+ EXPECT_EQ(entry->getBucketInfo().getDocumentCount(), n);
+ expected_bucket_info = entry->getBucketInfo();
+ }
+ // All replies should have the _same_ bucket info due to being processed in the same batch.
+ auto replies = c.top.getRepliesOnce();
+ for (auto& reply : replies) {
+ auto actual_bucket_info = dynamic_cast<api::PutReply&>(*reply).getBucketInfo();
+ EXPECT_EQ(actual_bucket_info, expected_bucket_info);
+ }
+ c.filestorHandler->close(); // Ensure persistence thread is no longer in message fetch code
+}
+
TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) {
TestFileStorComponents c(*this);
@@ -726,7 +759,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
filestorHandler.schedule(cmd);
}
- std::this_thread::sleep_for(51ms);
+ _node->getClock().addMilliSecondsToTime(51);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
if (lock.lock.get()) {
diff --git a/storage/src/tests/persistence/filestorage/gtest_runner.cpp b/storage/src/tests/persistence/filestorage/gtest_runner.cpp
index 5d1fde4130c..1ed7bc91843 100644
--- a/storage/src/tests/persistence/filestorage/gtest_runner.cpp
+++ b/storage/src/tests/persistence/filestorage/gtest_runner.cpp
@@ -1,8 +1,17 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/gtest/gtest.h>
+#include <absl/debugging/failure_signal_handler.h>
#include <vespa/log/log.h>
LOG_SETUP("storage_filestorage_gtest_runner");
-GTEST_MAIN_RUN_ALL_TESTS()
+int main(int argc, char* argv[]) {
+ absl::FailureSignalHandlerOptions opts;
+ opts.call_previous_handler = true;
+ opts.use_alternate_stack = false; // Suboptimal, but needed to get proper backtracing (for some reason...)
+ absl::InstallFailureSignalHandler(opts);
+
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 59e0853cc21..ab176ebb9cb 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -143,7 +143,7 @@ AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider
MessageTracker::UP
AsyncHandler::handleRunTask(RunTaskCommand& cmd, MessageTracker::UP tracker) const {
auto task = makeResultTask([tracker = std::move(tracker)](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->sendReply();
});
spi::Bucket bucket(cmd.getBucket());
@@ -169,7 +169,7 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
spi::Bucket bucket = _env.getBucket(cmd.getDocumentId(), cmd.getBucket());
auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->sendReply();
});
_spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(),
@@ -517,7 +517,7 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
}
auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
- tracker->checkForError(*response);
+ (void)tracker->checkForError(*response);
tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed));
tracker->sendReply();
});
diff --git a/storage/src/vespa/storage/persistence/batched_message.h b/storage/src/vespa/storage/persistence/batched_message.h
new file mode 100644
index 00000000000..c23383edfde
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/batched_message.h
@@ -0,0 +1,14 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.//
+#pragma once
+
+#include "shared_operation_throttler.h"
+#include <memory>
+#include <utility>
+
+namespace storage {
+
+namespace api { class StorageMessage; }
+
+using BatchedMessage = std::pair<std::shared_ptr<api::StorageMessage>, ThrottleToken>;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
index a89b705de1b..274d59899d9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -5,4 +5,15 @@ namespace storage {
FileStorHandler::LockedMessage::~LockedMessage() = default;
+FileStorHandler::LockedMessageBatch::LockedMessageBatch(LockedMessage&& initial_msg)
+ : lock(std::move(initial_msg.lock)),
+ messages()
+{
+ if (lock) {
+ messages.emplace_back(std::move(initial_msg.msg), std::move(initial_msg.throttle_token));
+ }
+}
+
+FileStorHandler::LockedMessageBatch::~LockedMessageBatch() = default;
+
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 68b8411c762..6a8b74baf1d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -1,23 +1,13 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-/**
- * \class storage::FileStorHandler
- * \ingroup storage
- *
- * \brief Common resource for filestor threads
- *
- * Takes care of the interface between file stor threads and the file stor
- * manager to avoid circular dependencies, and confine the implementation that
- * needs to worry about locking between these components.
- */
-
#pragma once
#include <vespa/document/bucket/bucket.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/persistence/batched_message.h>
#include <vespa/storage/persistence/shared_operation_throttler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/vespalib/util/small_vector.h>
namespace storage {
namespace api {
@@ -80,13 +70,7 @@ public:
std::shared_ptr<api::StorageMessage> msg;
ThrottleToken throttle_token;
- LockedMessage() noexcept = default;
- LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
- std::shared_ptr<api::StorageMessage> msg_) noexcept
- : lock(std::move(lock_)),
- msg(std::move(msg_)),
- throttle_token()
- {}
+ constexpr LockedMessage() noexcept = default;
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
std::shared_ptr<api::StorageMessage> msg_,
ThrottleToken token) noexcept
@@ -98,27 +82,40 @@ public:
~LockedMessage();
};
+ struct LockedMessageBatch {
+ std::shared_ptr<BucketLockInterface> lock;
+ vespalib::SmallVector<BatchedMessage, 1> messages;
+
+ LockedMessageBatch() = default;
+ explicit LockedMessageBatch(LockedMessage&& initial_msg);
+ LockedMessageBatch(LockedMessageBatch&&) noexcept = default;
+ ~LockedMessageBatch();
+
+ [[nodiscard]] bool empty() const noexcept { return messages.empty(); }
+ [[nodiscard]] size_t size() const noexcept { return messages.size(); }
+ // Precondition: messages.size() == 1
+ [[nodiscard]] LockedMessage release_as_single_msg() noexcept;
+ };
+
class ScheduleAsyncResult {
- private:
- bool _was_scheduled;
+ bool _was_scheduled;
LockedMessage _async_message;
-
public:
- ScheduleAsyncResult() : _was_scheduled(false), _async_message() {}
- explicit ScheduleAsyncResult(LockedMessage&& async_message_in)
+ constexpr ScheduleAsyncResult() noexcept : _was_scheduled(false), _async_message() {}
+ explicit ScheduleAsyncResult(LockedMessage&& async_message_in) noexcept
: _was_scheduled(true),
_async_message(std::move(async_message_in))
{}
- bool was_scheduled() const {
+ [[nodiscard]] bool was_scheduled() const noexcept {
return _was_scheduled;
}
- bool has_async_message() const {
- return _async_message.lock.get() != nullptr;
+ [[nodiscard]] bool has_async_message() const noexcept {
+ return static_cast<bool>(_async_message.lock);
}
- const LockedMessage& async_message() const {
+ [[nodiscard]] const LockedMessage& async_message() const noexcept {
return _async_message;
}
- LockedMessage&& release_async_message() {
+ [[nodiscard]] LockedMessage&& release_async_message() noexcept {
return std::move(_async_message);
}
};
@@ -129,7 +126,7 @@ public:
};
FileStorHandler() : _getNextMessageTimout(100ms) { }
- virtual ~FileStorHandler() = default;
+ ~FileStorHandler() override = default;
/**
@@ -171,7 +168,9 @@ public:
*
* @param stripe The stripe to get messages for
*/
- virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;
+ [[nodiscard]] virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;
+
+ [[nodiscard]] virtual LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) = 0;
/** Only used for testing, should be removed */
LockedMessage getNextMessage(uint32_t stripeId) {
@@ -189,8 +188,6 @@ public:
* NB: As current operation can be a split or join operation, make sure that
* you always wait for current to finish, if is a super or sub bucket of
* the bucket we're locking.
- *
- *
*/
virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0;
@@ -287,6 +284,8 @@ public:
virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0;
virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
+
+ virtual void set_max_feed_op_batch_size(uint32_t max_batch) noexcept = 0;
private:
vespalib::duration _getNextMessageTimout;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 1984d44652a..7589fb3cdb3 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -12,6 +12,7 @@
#include <vespa/storage/persistence/messages.h>
#include <vespa/storageapi/message/stat.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <vespa/vespalib/stllike/hash_set.hpp>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/string_escape.h>
@@ -60,7 +61,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false),
_throttle_apply_bucket_diff_ops(false),
- _last_active_operations_stats()
+ _last_active_operations_stats(),
+ _max_feed_op_batch_size(1)
{
assert(numStripes > 0);
_stripes.reserve(numStripes);
@@ -241,8 +243,7 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg)
{
if (getState() == FileStorHandler::AVAILABLE) {
document::Bucket bucket = getStorageMessageBucket(*msg);
- stripe(bucket).schedule(MessageEntry(msg, bucket));
- return true;
+ return stripe(bucket).schedule(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime()));
}
return false;
}
@@ -252,7 +253,7 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a
{
if (getState() == FileStorHandler::AVAILABLE) {
document::Bucket bucket = getStorageMessageBucket(*msg);
- return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket)));
+ return ScheduleAsyncResult(stripe(bucket).schedule_and_get_next_async_message(MessageEntry(msg, bucket, _component.getClock().getMonotonicTime())));
}
return {};
}
@@ -403,10 +404,25 @@ FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time dea
if (!tryHandlePause()) {
return {}; // Still paused, return to allow tick.
}
-
return _stripes[stripeId].getNextMessage(deadline);
}
+FileStorHandler::LockedMessageBatch
+FileStorHandlerImpl::next_message_batch(uint32_t stripe_id, vespalib::steady_time now, vespalib::steady_time deadline)
+{
+ if (!tryHandlePause()) {
+ return {};
+ }
+ return _stripes[stripe_id].next_message_batch(now, deadline);
+}
+
+FileStorHandler::LockedMessage
+FileStorHandlerImpl::LockedMessageBatch::release_as_single_msg() noexcept
+{
+ assert(lock && messages.size() == 1);
+ return {std::move(lock), std::move(messages[0].first), std::move(messages[0].second)};
+}
+
std::shared_ptr<FileStorHandler::BucketLockInterface>
FileStorHandlerImpl::Stripe::lock(const document::Bucket &bucket, api::LockingRequirements lockReq) {
std::unique_lock guard(*_lock);
@@ -858,9 +874,10 @@ FileStorHandlerImpl::sendReplyDirectly(const std::shared_ptr<api::StorageReply>&
}
FileStorHandlerImpl::MessageEntry::MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd,
- const document::Bucket &bucket)
+ const document::Bucket& bucket,
+ vespalib::steady_time scheduled_at_time)
: _command(cmd),
- _timer(),
+ _timer(scheduled_at_time),
_bucket(bucket),
_priority(cmd->getPriority())
{ }
@@ -939,9 +956,8 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
+FileStorHandlerImpl::Stripe::next_message_impl(monitor_guard& guard, vespalib::steady_time deadline)
{
- std::unique_lock guard(*_lock);
ThrottleToken throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
@@ -993,6 +1009,93 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
}
FileStorHandler::LockedMessage
+FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
+{
+ std::unique_lock guard(*_lock);
+ return next_message_impl(guard, deadline);
+}
+
+namespace {
+
+constexpr bool is_batchable_feed_op(api::MessageType::Id id) noexcept {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID);
+}
+
+// Precondition: msg must be a feed operation request (put, remove, update)
+document::GlobalId gid_from_feed_op(const api::StorageMessage& msg) {
+ switch (msg.getType().getId()) {
+ case api::MessageType::PUT_ID:
+ return static_cast<const api::PutCommand&>(msg).getDocumentId().getGlobalId();
+ case api::MessageType::REMOVE_ID:
+ return static_cast<const api::RemoveCommand&>(msg).getDocumentId().getGlobalId();
+ case api::MessageType::UPDATE_ID:
+ return static_cast<const api::UpdateCommand&>(msg).getDocumentId().getGlobalId();
+ default: abort();
+ }
+}
+
+} // anon ns
+
+FileStorHandler::LockedMessageBatch
+FileStorHandlerImpl::Stripe::next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline)
+{
+ const auto max_batch_size = _owner.max_feed_op_batch_size();
+
+ std::unique_lock guard(*_lock);
+ auto initial_locked = next_message_impl(guard, deadline);
+ if (!initial_locked.lock || !is_batchable_feed_op(initial_locked.msg->getType().getId()) || (max_batch_size == 1)) {
+ return LockedMessageBatch(std::move(initial_locked));
+ }
+ LockedMessageBatch batch(std::move(initial_locked));
+ fill_feed_op_batch(guard, batch, max_batch_size, now);
+ return batch;
+}
+
+void
+FileStorHandlerImpl::Stripe::fill_feed_op_batch(monitor_guard& guard, LockedMessageBatch& batch,
+ uint32_t max_batch_size, vespalib::steady_time now)
+{
+ assert(batch.size() == 1);
+ assert(guard.owns_lock());
+ BucketIdx& idx = bmi::get<2>(*_queue);
+ auto bucket_msgs = idx.equal_range(batch.lock->getBucket());
+ // Process in FIFO order (_not_ priority order) until we hit the end, a non-batchable operation
+ // (implicit pipeline stall since bucket set might change) or can't get another throttle token.
+ // We also stall the pipeline if we get a concurrent modification to the same document (not expected,
+ // as the distributors should prevent this, but _technically_ it is possible).
+ const auto expected_max_size = std::min(ssize_t(max_batch_size), std::distance(bucket_msgs.first, bucket_msgs.second) + 1);
+ vespalib::hash_set<document::GlobalId, document::GlobalId::hash> gids_in_batch(expected_max_size);
+ gids_in_batch.insert(gid_from_feed_op(*batch.messages[0].first));
+ for (auto it = bucket_msgs.first; (it != bucket_msgs.second) && (batch.messages.size() < max_batch_size);) {
+ if (!is_batchable_feed_op(it->_command->getType().getId())) {
+ break;
+ }
+ auto [existing_iter, inserted] = gids_in_batch.insert(gid_from_feed_op(*it->_command));
+ if (!inserted) {
+ break; // Already present in batch
+ }
+ if (messageTimedOutInQueue(*it->_command, now - it->_timer.start_time())) {
+ // We just ignore timed out ops here; actually generating a timeout reply will be done by
+ // next_message_impl() during a subsequent invocation. This avoids having to deal with any
+ // potential issues caused by sending a reply up while holding the queue lock, since we
+ // can't release it here.
+ ++it;
+ continue;
+ }
+ auto throttle_token = _owner.operation_throttler().try_acquire_one();
+ if (!throttle_token.valid()) {
+ break;
+ }
+ // Note: iterator is const; can't std::move(it->_command)
+ batch.messages.emplace_back(it->_command, std::move(throttle_token));
+ it = idx.erase(it);
+ }
+ update_cached_queue_size(guard);
+}
+
+FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
{
if (_owner.isPaused()) {
@@ -1021,9 +1124,11 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
ThrottleToken throttle_token)
{
- std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
+ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(
+ _owner._component.getClock().getMonotonicTime(),
+ _metrics->averageQueueWaitingTime)));
- std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
+ std::shared_ptr<api::StorageMessage> msg = iter->_command; // iter is const; can't std::move()
document::Bucket bucket(iter->_bucket);
idx.erase(iter); // iter not used after this point.
update_cached_queue_size(guard);
@@ -1032,7 +1137,6 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
auto locker = std::make_unique<BucketLock>(guard, *this, bucket, msg->getPriority(),
msg->getType().getId(), msg->getMsgId(),
msg->lockingRequirements());
- guard.unlock();
return {std::move(locker), std::move(msg), std::move(throttle_token)};
} else {
std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index f7c9b218779..ac8e5c52cbf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -55,7 +55,9 @@ public:
document::Bucket _bucket;
uint8_t _priority;
- MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd, const document::Bucket &bId);
+ MessageEntry(const std::shared_ptr<api::StorageMessage>& cmd,
+ const document::Bucket& bucket,
+ vespalib::steady_time scheduled_at_time);
MessageEntry(MessageEntry &&) noexcept ;
MessageEntry(const MessageEntry &) noexcept;
MessageEntry & operator = (const MessageEntry &) = delete;
@@ -66,13 +68,13 @@ public:
}
};
- using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry> >;
- using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>;
-
+ // ordered_non_unique shall preserve insertion order as iteration order of equal keys, but this is rather magical...
+ using PriorityOrder = bmi::ordered_non_unique<bmi::identity<MessageEntry>>;
+ using BucketOrder = bmi::ordered_non_unique<bmi::member<MessageEntry, document::Bucket, &MessageEntry::_bucket>>;
using PriorityQueue = bmi::multi_index_container<MessageEntry, bmi::indexed_by<bmi::sequenced<>, PriorityOrder, BucketOrder>>;
+ using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
+ using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
- using PriorityIdx = bmi::nth_index<PriorityQueue, 1>::type;
- using BucketIdx = bmi::nth_index<PriorityQueue, 2>::type;
using Clock = std::chrono::steady_clock;
using monitor_guard = std::unique_lock<std::mutex>;
using atomic_size_t = vespalib::datastore::AtomicValueWrapper<size_t>;
@@ -114,7 +116,7 @@ public:
ActiveOperationsStats &stats() { return _stats; }
};
SafeActiveOperationsStats() : _lock(std::make_unique<std::mutex>()), _stats() {}
- Guard guard() { return Guard(*_lock, _stats, ctor_tag()); }
+ [[nodiscard]] Guard guard() { return Guard(*_lock, _stats, ctor_tag()); }
};
Stripe(const FileStorHandlerImpl & owner, MessageSender & messageSender);
@@ -123,8 +125,8 @@ public:
Stripe & operator =(const Stripe &) = delete;
~Stripe();
void flush();
- bool schedule(MessageEntry messageEntry);
- FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry);
+ [[nodiscard]] bool schedule(MessageEntry messageEntry);
+ [[nodiscard]] FileStorHandler::LockedMessage schedule_and_get_next_async_message(MessageEntry entry);
void waitUntilNoLocks() const;
void abort(std::vector<std::shared_ptr<api::StorageReply>> & aborted, const AbortBucketOperationsCommand& cmd);
void waitInactive(const AbortBucketOperationsCommand& cmd) const;
@@ -151,18 +153,19 @@ public:
api::LockingRequirements lockReq, bool count_as_active_merge,
const LockEntry & lockEntry);
- std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
+ [[nodiscard]] std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
- FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
+ [[nodiscard]] FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
+ [[nodiscard]] FileStorHandler::LockedMessageBatch next_message_batch(vespalib::steady_time now, vespalib::steady_time deadline);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
- std::mutex & exposeLock() { return *_lock; }
- PriorityQueue & exposeQueue() { return *_queue; }
- BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); }
+ [[nodiscard]] std::mutex & exposeLock() { return *_lock; }
+ [[nodiscard]] PriorityQueue & exposeQueue() { return *_queue; }
+ [[nodiscard]] BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); }
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
- ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
+ [[nodiscard]] ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
private:
void update_cached_queue_size(const std::lock_guard<std::mutex> &) {
_cached_queue_size.store_relaxed(_queue->size());
@@ -170,15 +173,20 @@ public:
void update_cached_queue_size(const std::unique_lock<std::mutex> &) {
_cached_queue_size.store_relaxed(_queue->size());
}
- bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
- FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
+ [[nodiscard]] bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
+ [[nodiscard]] FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
[[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept;
+ [[nodiscard]] FileStorHandler::LockedMessage next_message_impl(monitor_guard& held_lock,
+ vespalib::steady_time deadline);
+ void fill_feed_op_batch(monitor_guard& held_lock, LockedMessageBatch& batch,
+ uint32_t max_batch_size, vespalib::steady_time now);
+
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
- FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
- PriorityIdx::iterator iter,
- ThrottleToken throttle_token);
+ [[nodiscard]] FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
+ PriorityIdx::iterator iter,
+ ThrottleToken throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -233,7 +241,8 @@ public:
bool schedule(const std::shared_ptr<api::StorageMessage>&) override;
ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override;
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;
+ LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;
+ LockedMessageBatch next_message_batch(uint32_t stripe, vespalib::steady_time now, vespalib::steady_time deadline) override;
void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override;
void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override;
@@ -292,6 +301,13 @@ public:
_throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed);
}
+ void set_max_feed_op_batch_size(uint32_t max_batch) noexcept override {
+ _max_feed_op_batch_size.store(max_batch, std::memory_order_relaxed);
+ }
+ [[nodiscard]] uint32_t max_feed_op_batch_size() const noexcept {
+ return _max_feed_op_batch_size.load(std::memory_order_relaxed);
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -316,6 +332,7 @@ private:
std::atomic<bool> _paused;
std::atomic<bool> _throttle_apply_bucket_diff_ops;
std::optional<ActiveOperationsStats> _last_active_operations_stats;
+ std::atomic<uint32_t> _max_feed_op_batch_size;
// Returns the index in the targets array we are sending to, or -1 if none of them match.
int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets);
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 0046bd96b65..23de39f7130 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -239,6 +239,7 @@ FileStorManager::on_configure(const StorFilestorConfig& config)
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size());
_filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
+ _filestorHandler->set_max_feed_op_batch_size(std::max(1, config.maxFeedOpBatchSize));
// TODO remove once desired throttling behavior is set in stone
{
_filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 0612798b43a..a01a4656d01 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -134,7 +134,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
_env._metrics.operations.inc();
if (msg.getType().isReply()) {
- try{
+ try {
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
@@ -161,9 +161,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
void
PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get());
api::StorageMessage & msg(*lock.msg);
-
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler,
@@ -174,4 +172,24 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co
}
}
+void
+PersistenceHandler::process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock,
+ std::span<BatchedMessage> bucket_messages)
+{
+ const auto bucket = lock->getBucket();
+ auto batch = std::make_shared<AsyncMessageBatch>(std::move(lock), _env, _env._fileStorHandler);
+ for (auto& bm : bucket_messages) {
+ assert(bm.first->getBucket() == bucket);
+ // Important: we _copy_ the message shared_ptr instead of moving to ensure that `*bm.first` remains
+ // valid even if the tracker is destroyed by an exception in processMessage(). All std::exceptions
+ // are caught there, so we do not expect our loop to be interrupted.
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, batch,
+ batch->deferred_sender_stub(), bm.first, std::move(bm.second));
+ tracker = processMessage(*bm.first, std::move(tracker));
+ if (tracker) {
+ tracker->sendReply(); // Actually defers to batch reply queue
+ }
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index 0da518a1cfa..224bb70a16b 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -2,14 +2,16 @@
#pragma once
-#include "processallhandler.h"
-#include "mergehandler.h"
#include "asynchandler.h"
+#include "batched_message.h"
+#include "mergehandler.h"
#include "persistenceutil.h"
-#include "splitjoinhandler.h"
+#include "processallhandler.h"
#include "simplemessagehandler.h"
+#include "splitjoinhandler.h"
#include <vespa/storage/common/storagecomponent.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
+#include <span>
namespace storage {
@@ -29,6 +31,8 @@ public:
~PersistenceHandler();
void processLockedMessage(FileStorHandler::LockedMessage lock) const;
+ void process_locked_message_batch(std::shared_ptr<FileStorHandler::BucketLockInterface> lock,
+ std::span<BatchedMessage> bucket_messages);
//TODO Rewrite tests to avoid this api leak
const AsyncHandler & asyncHandler() const { return _asyncHandler; }
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index a98418281d2..5710c3fe26f 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -14,6 +14,7 @@ PersistenceThread::PersistenceThread(PersistenceHandler & persistenceHandler, Fi
uint32_t stripeId, framework::Component & component)
: _persistenceHandler(persistenceHandler),
_fileStorHandler(fileStorHandler),
+ _clock(component.getClock()),
_stripeId(stripeId),
_thread()
{
@@ -36,14 +37,19 @@ PersistenceThread::run(framework::ThreadHandle& thread)
vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms);
while (!thread.interrupted()) {
- vespalib::steady_time now = vespalib::steady_clock::now();
+ vespalib::steady_time now = _clock.getMonotonicTime();
thread.registerTick(framework::UNKNOWN_CYCLE, now);
vespalib::steady_time deadline = now + max_wait_time;
- FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline));
-
- if (lock.lock) {
- _persistenceHandler.processLockedMessage(std::move(lock));
+ auto batch = _fileStorHandler.next_message_batch(_stripeId, now, deadline);
+ if (!batch.empty()) {
+ // Special-case single message batches, as actually scheduling a full batch has more
+ // overhead due to extra bookkeeping state and deferred reply-sending.
+ if (batch.size() == 1) {
+ _persistenceHandler.processLockedMessage(batch.release_as_single_msg());
+ } else {
+ _persistenceHandler.process_locked_message_batch(std::move(batch.lock), batch.messages);
+ }
}
}
LOG(debug, "Closing down persistence thread");
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index aacd1dd4830..2e9852ada73 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -8,6 +8,7 @@
namespace storage {
namespace framework {
+ struct Clock;
class Component;
class Thread;
}
@@ -27,10 +28,11 @@ public:
framework::Thread& getThread() override { return *_thread; }
private:
- PersistenceHandler & _persistenceHandler;
- FileStorHandler & _fileStorHandler;
- uint32_t _stripeId;
- std::unique_ptr<framework::Thread> _thread;
+ PersistenceHandler& _persistenceHandler;
+ FileStorHandler& _fileStorHandler;
+ const framework::Clock& _clock;
+ uint32_t _stripeId;
+ std::unique_ptr<framework::Thread> _thread;
void run(framework::ThreadHandle&) override;
};
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index c975721c855..febc494aff1 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -11,41 +11,83 @@ LOG_SETUP(".persistence.util");
namespace storage {
namespace {
- bool isBatchable(api::MessageType::Id id)
- {
- return (id == api::MessageType::PUT_ID ||
- id == api::MessageType::REMOVE_ID ||
- id == api::MessageType::UPDATE_ID);
- }
- bool hasBucketInfo(api::MessageType::Id id)
- {
- return (isBatchable(id) ||
- (id == api::MessageType::REMOVELOCATION_ID ||
- id == api::MessageType::JOINBUCKETS_ID));
+constexpr bool is_batchable(api::MessageType::Id id) noexcept {
+ return (id == api::MessageType::PUT_ID ||
+ id == api::MessageType::REMOVE_ID ||
+ id == api::MessageType::UPDATE_ID);
+}
+
+constexpr bool has_bucket_info(api::MessageType::Id id) noexcept {
+ return (is_batchable(id) ||
+ (id == api::MessageType::REMOVELOCATION_ID ||
+ id == api::MessageType::JOINBUCKETS_ID));
+}
+
+constexpr vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
+
+} // anon ns
+
+DeferredReplySenderStub::DeferredReplySenderStub() = default;
+DeferredReplySenderStub::~DeferredReplySenderStub() = default;
+
+AsyncMessageBatch::AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ const PersistenceUtil& env,
+ MessageSender& reply_sender) noexcept
+ : _bucket_lock(std::move(bucket_lock)),
+ _env(env),
+ _reply_sender(reply_sender),
+ _deferred_sender_stub()
+{
+ assert(_bucket_lock);
+}
+
+AsyncMessageBatch::~AsyncMessageBatch() {
+ const auto bucket_info = _env.getBucketInfo(_bucket_lock->getBucket());
+ _env.updateBucketDatabase(_bucket_lock->getBucket(), bucket_info);
+
+ std::lock_guard lock(_deferred_sender_stub._mutex); // Ensure visibility of posted replies
+ for (auto& reply : _deferred_sender_stub._deferred_replies) {
+ if (reply->getResult().success()) {
+ dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(bucket_info);
+ }
+ _reply_sender.sendReplyDirectly(reply);
}
- const vespalib::duration WARN_ON_SLOW_OPERATIONS = 5s;
+ LOG(debug, "Processed async feed message batch of %zu ops for %s. New bucket info is %s",
+ _deferred_sender_stub._deferred_replies.size(),
+ _bucket_lock->getBucket().toString().c_str(), bucket_info.toString().c_str());
}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token)
- : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), {}, std::move(msg), std::move(throttle_token))
+{}
+
+MessageTracker::MessageTracker(const framework::MilliSecTimer& timer,
+ const PersistenceUtil& env,
+ std::shared_ptr<AsyncMessageBatch> batch,
+ MessageSender& deferred_reply_sender,
+ std::shared_ptr<api::StorageMessage> msg,
+ ThrottleToken throttle_token)
+ : MessageTracker(timer, env, deferred_reply_sender, false, {}, std::move(batch), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
- bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg,
+ bool update_bucket_info,
+ std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ std::shared_ptr<AsyncMessageBatch> part_of_batch,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token)
: _sendReply(true),
- _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
- _bucketLock(std::move(bucketLock)),
+ _updateBucketInfo(update_bucket_info && has_bucket_info(msg->getType().getId())),
+ _bucketLock(std::move(bucket_lock)),
+ _part_of_batch(std::move(part_of_batch)),
_msg(std::move(msg)),
_throttle_token(std::move(throttle_token)),
_context(_msg->getPriority(), _msg->getTrace().getLevel()),
@@ -61,7 +103,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
- std::move(msg), ThrottleToken()));
+ {}, std::move(msg), ThrottleToken()));
}
void
@@ -102,6 +144,7 @@ MessageTracker::sendReply() {
if (hasReply()) {
getReply().getTrace().addChild(_context.steal_trace());
if (_updateBucketInfo) {
+ assert(_bucketLock);
if (getReply().getResult().success()) {
_env.setBucketInfo(*this, _bucketLock->getBucket());
}
@@ -163,7 +206,7 @@ MessageTracker::generateReply(api::StorageCommand& cmd)
std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier>
MessageTracker::sync_phase_done_notifier_or_nullptr() const
{
- if (_bucketLock->wants_sync_phase_done_notification()) {
+ if (_bucketLock && _bucketLock->wants_sync_phase_done_notification()) {
return _bucketLock;
}
return {};
@@ -236,7 +279,7 @@ PersistenceUtil::setBucketInfo(MessageTracker& tracker, const document::Bucket &
{
api::BucketInfo info = getBucketInfo(bucket);
- static_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info);
+ dynamic_cast<api::BucketInfoReply&>(tracker.getReply()).setBucketInfo(info);
updateBucketDatabase(bucket, info);
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 900f301252e..67e96befe00 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -25,12 +25,52 @@ namespace storage {
class PersistenceUtil;
+struct DeferredReplySenderStub : MessageSender {
+ std::mutex _mutex;
+ std::vector<std::shared_ptr<api::StorageReply>> _deferred_replies;
+
+ DeferredReplySenderStub();
+ ~DeferredReplySenderStub() override;
+
+ void sendCommand(const std::shared_ptr<api::StorageCommand>&) override {
+ abort(); // Not supported
+ }
+ void sendReply(const std::shared_ptr<api::StorageReply>& reply) override {
+ std::lock_guard lock(_mutex);
+ _deferred_replies.emplace_back(reply);
+ }
+};
+
+class AsyncMessageBatch {
+ std::shared_ptr<FileStorHandler::BucketLockInterface> _bucket_lock;
+ const PersistenceUtil& _env;
+ MessageSender& _reply_sender;
+ DeferredReplySenderStub _deferred_sender_stub;
+public:
+ AsyncMessageBatch(std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ const PersistenceUtil& env,
+ MessageSender& reply_sender) noexcept;
+ // Triggered by last referencing batched MessageTracker being destroyed.
+ // Fetches bucket info, updates DB and sends all deferred replies with the new bucket info.
+ ~AsyncMessageBatch();
+
+ [[nodiscard]] MessageSender& deferred_sender_stub() noexcept { return _deferred_sender_stub; }
+};
+
class MessageTracker {
public:
using UP = std::unique_ptr<MessageTracker>;
- MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & reply_sender,
+ FileStorHandler::BucketLockInterface::SP bucket_lock, std::shared_ptr<api::StorageMessage> msg,
+ ThrottleToken throttle_token);
+
+ // For use with batching where bucket lock is held separately and bucket info
+ // is _not_ fetched or updated per message.
+ MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env,
+ std::shared_ptr<AsyncMessageBatch> batch,
+ MessageSender& deferred_reply_sender,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token);
~MessageTracker();
@@ -58,48 +98,53 @@ public:
* commands like merge. */
void dontReply() { _sendReply = false; }
- bool hasReply() const { return bool(_reply); }
- const api::StorageReply & getReply() const {
+ [[nodiscard]] bool hasReply() const { return bool(_reply); }
+ [[nodiscard]] const api::StorageReply & getReply() const {
return *_reply;
}
- api::StorageReply & getReply() {
+ [[nodiscard]] api::StorageReply & getReply() {
return *_reply;
}
- std::shared_ptr<api::StorageReply> && stealReplySP() && {
+ [[nodiscard]] std::shared_ptr<api::StorageReply> && stealReplySP() && {
return std::move(_reply);
}
void generateReply(api::StorageCommand& cmd);
- api::ReturnCode getResult() const { return _result; }
+ [[nodiscard]] api::ReturnCode getResult() const { return _result; }
- spi::Context & context() { return _context; }
- document::BucketId getBucketId() const {
+ [[nodiscard]] spi::Context & context() { return _context; }
+ [[nodiscard]] document::BucketId getBucketId() const {
return _bucketLock->getBucket().getBucketId();
}
void sendReply();
- bool checkForError(const spi::Result& response);
+ [[nodiscard]] bool checkForError(const spi::Result& response);
// Returns a non-nullptr notifier instance iff the underlying operation wants to be notified
- // when the sync phase is complete. Otherwise returns a nullptr shared_ptr.
- std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
+ // when the sync phase is complete. Otherwise, returns a nullptr shared_ptr.
+ [[nodiscard]] std::shared_ptr<FileStorHandler::OperationSyncPhaseDoneNotifier> sync_phase_done_notifier_or_nullptr() const;
static MessageTracker::UP
createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
private:
- MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ MessageTracker(const framework::MilliSecTimer& timer, const PersistenceUtil& env,
+ MessageSender& reply_sender, bool update_bucket_info,
+ std::shared_ptr<FileStorHandler::BucketLockInterface> bucket_lock,
+ std::shared_ptr<AsyncMessageBatch> part_of_batch,
+ std::shared_ptr<api::StorageMessage> msg,
ThrottleToken throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
bool _sendReply;
bool _updateBucketInfo;
+ // Either _bucketLock or _part_of_batch must be set, never both at the same time
FileStorHandler::BucketLockInterface::SP _bucketLock;
+ std::shared_ptr<AsyncMessageBatch> _part_of_batch; // nullptr if not batched
std::shared_ptr<api::StorageMessage> _msg;
ThrottleToken _throttle_token;
spi::Context _context;
@@ -117,8 +162,6 @@ public:
struct LockResult {
std::shared_ptr<FileStorHandler::BucketLockInterface> lock;
LockResult() : lock() {}
-
- bool bucketExisted() const { return bool(lock); }
};
PersistenceUtil(const ServiceLayerComponent&, FileStorHandler& fileStorHandler,
diff --git a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
index 277e7b4fdfd..93b305370e0 100644
--- a/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
+++ b/storage/src/vespa/storageframework/defaultimplementation/clock/realclock.h
@@ -13,7 +13,7 @@
namespace storage::framework::defaultimplementation {
-struct RealClock : public Clock {
+struct RealClock final : public Clock {
vespalib::steady_time getMonotonicTime() const override;
vespalib::system_time getSystemTime() const override;
};