aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-06 11:49:57 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-10 14:35:01 +0000
commit3f15206f090ce36c08282b8d64d6ff9c15c8fd69 (patch)
treeedc117087a4fac59dc6f507e5c88f305845839ca
parent6fc8a76d2062e6176510804be59faffe2e20662d (diff)
Support dynamic throttling of async persistence operations
Adds an operation throttler that is intended to provide global throttling of async operations across all persistence stripe threads. A throttler wraps a logical max pending window size of in-flight operations. Depending on the throttler implementation, the window size may expand and shrink dynamically. Exactly how and when this happens is unspecified. Commit adds two throttler implementations: * An unlimited throttler that is no-op and never blocks. * A throttler built around the mbus `DynamicThrottlePolicy` and defers all window decisions to it. Current config default is to use the unlimited throttler. Config changes require a process restart. Offers both polling and (timed, non-timed) blocking calls for acquiring a throttle token. If the returned token is valid, the caller may proceed to invoke the asynchronous operation. The window slot taken up by a valid throttle token is implicitly freed up when the token is destroyed.
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def19
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp34
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h29
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h25
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h7
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp191
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h71
25 files changed, 633 insertions, 81 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 66700eff3e6..c351e52b557 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -75,3 +75,22 @@ use_async_message_handling_on_schedule bool default=false restart
## the entire resource usage sample is immediately reported to the cluster controller (via host info).
## This config can be live updated (doesn't require restart).
resource_usage_reporter_noise_level double default=0.001
+
+## Specify throttling used for async persistence operations. This throttling takes place
+## before operations are dispatched to Proton and serves as a limiter for how many
+## operations may be in flight in Proton's internal queues.
+##
+## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but
+## has near zero overhead and never blocks.
+## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
+## is full (if a blocking throttler API call is invoked).
+##
+async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+
+## Specifies the extent the throttling window is increased by when the async throttle
+## policy has decided that more concurrent operations are desirable. Also affects the
+## _minimum_ size of the throttling window; its size is implicitly set to max(this config
+## value, number of threads).
+##
+## Only applies if async_operation_throttler_type == DYNAMIC.
+async_operation_dynamic_throttling_window_increment int default=20 restart
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 7b165e11b66..fb8120210c1 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
+ shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
index a5dd3d929db..8caa84977ce 100644
--- a/storage/src/tests/persistence/active_operations_stats_test.cpp
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats()
auto lock0 = filestorHandler->getNextMessage(stripeId);
auto lock1 = filestorHandler->getNextMessage(stripeId);
auto lock2 = filestorHandler->getNextMessage(stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock1.first);
- ASSERT_FALSE(lock2.first);
+ ASSERT_TRUE(lock0.lock);
+ ASSERT_TRUE(lock1.lock);
+ ASSERT_FALSE(lock2.lock);
auto stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("during");
assert_active_operations_stats(stats, 2, 2, 0);
}
EXPECT_EQ(3, stats.get_total_size());
- lock0.first.reset();
- lock1.first.reset();
+ lock0.lock.reset();
+ lock1.lock.reset();
stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("after");
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index cd496605a6c..939e7ae7b6a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -570,7 +570,7 @@ public:
void run() override {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
- if (msg.second.get()) {
+ if (msg.msg.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
std::this_thread::sleep_for(5ms);
@@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) {
std::this_thread::sleep_for(51ms);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
- if (lock.first.get()) {
- ASSERT_EQ(200, lock.second->getPriority());
+ if (lock.lock.get()) {
+ ASSERT_EQ(200, lock.msg->getPriority());
break;
}
}
@@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri,
{
EXPECT_TRUE(result.was_scheduled());
ASSERT_TRUE(result.has_async_message());
- EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+ EXPECT_EQ(exp_pri, result.async_message().msg->getPriority());
}
void
@@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_async_message(20, result);
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
- EXPECT_EQ(40, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
+ EXPECT_EQ(40, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
@@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_empty_async_message(result);
- EXPECT_EQ(20, get_next_message().second->getPriority());
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(20, get_next_message().msg->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
@@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
expect_async_message(40, result);
}
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 2557fa537f5..5f3836727dd 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
f.filestorHandler->schedule(createPut(5432, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
+ ASSERT_TRUE(lock0.lock.get());
EXPECT_EQ(document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId());
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
+ ASSERT_TRUE(lock1.lock.get());
EXPECT_EQ(document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId());
}
TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) {
@@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Even though we already have a lock on the bucket, Gets allow shared locking and we
// should therefore be able to get another lock.
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+ ASSERT_TRUE(lock1.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) {
@@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) {
@@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) {
@@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..0ad380937c7
--- /dev/null
+++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
@@ -0,0 +1,116 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/persistence/shared_operation_throttler.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <chrono>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage {
+
+using ThrottleToken = SharedOperationThrottler::Token;
+
+TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQ(throttler->current_window_size(), 0);
+}
+
+TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQ(throttler->current_window_size(), 1);
+}
+
+TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto window_filling_token = throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ {
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
+// TODO ideally we'd test that the dynamic throttler has a window size that is actually
+// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
+// it's not trivial to know how to do this reliably.
+
+}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index c737d2bed28..5e068236026 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -14,6 +14,7 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
+ shared_operation_throttler.cpp
simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 2dc5989e857..73ccc7f6085 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -7,10 +7,16 @@
namespace storage {
-ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
+ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op,
+ const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric)
: _result_handler(nullptr),
_state(std::move(state)),
_doc_id(std::move(doc_id)),
+ _throttle_token(std::move(throttle_token)),
_op(op),
_start_time(clock),
_latency_metric(latency_metric)
@@ -27,6 +33,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) no
}
double elapsed = _start_time.getElapsedTimeAsDouble();
_latency_metric.addValue(elapsed);
+ _throttle_token.reset();
_state->on_entry_complete(std::move(result), _doc_id, _op);
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 1037318aec6..8478cab4c17 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -2,6 +2,7 @@
#pragma once
+#include "shared_operation_throttler.h"
#include <vespa/document/base/documentid.h>
#include <vespa/metrics/valuemetric.h>
#include <vespa/persistence/spi/operationcomplete.h>
@@ -21,12 +22,17 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
const spi::ResultHandler* _result_handler;
std::shared_ptr<ApplyBucketDiffState> _state;
document::DocumentId _doc_id;
+ SharedOperationThrottler::Token _throttle_token;
const char* _op;
framework::MilliSecTimer _start_time;
metrics::DoubleAverageMetric& _latency_metric;
public:
- ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
- ~ApplyBucketDiffEntryComplete();
+ ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op, const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric);
+ ~ApplyBucketDiffEntryComplete() override;
void onComplete(std::unique_ptr<spi::Result> result) noexcept override;
void addResultHandler(const spi::ResultHandler* resultHandler) override;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index 62d1a80501a..2d137f87118 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(storage_filestorpersistence OBJECT
SOURCES
active_operations_metrics.cpp
active_operations_stats.cpp
+ filestorhandler.cpp
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
new file mode 100644
index 00000000000..c066277ec71
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -0,0 +1,8 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "filestorhandler.h"
+
+namespace storage {
+
+FileStorHandler::LockedMessage::~LockedMessage() = default;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index a980b5aa2e1..6f740ce2c28 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -16,6 +16,7 @@
#include <vespa/document/bucket/bucket.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/persistence/shared_operation_throttler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
namespace storage {
@@ -74,7 +75,29 @@ public:
[[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
};
- using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
+ struct LockedMessage {
+ std::shared_ptr<BucketLockInterface> lock;
+ std::shared_ptr<api::StorageMessage> msg;
+ SharedOperationThrottler::Token throttle_token;
+
+ LockedMessage() noexcept = default;
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token()
+ {}
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_,
+ SharedOperationThrottler::Token token) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token(std::move(token))
+ {}
+ LockedMessage(LockedMessage&&) noexcept = default;
+ ~LockedMessage();
+ };
+
class ScheduleAsyncResult {
private:
bool _was_scheduled;
@@ -90,7 +113,7 @@ public:
return _was_scheduled;
}
bool has_async_message() const {
- return _async_message.first.get() != nullptr;
+ return _async_message.lock.get() != nullptr;
}
const LockedMessage& async_message() const {
return _async_message;
@@ -250,6 +273,8 @@ public:
virtual std::string dumpQueue() const = 0;
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
+
+ virtual SharedOperationThrottler& operation_throttler() const noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c6991803b4d..5e0ea0359dc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,16 +40,18 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg)
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
- ServiceLayerComponentRegister& compReg)
+ ServiceLayerComponentRegister& compReg,
+ std::unique_ptr<SharedOperationThrottler> operation_throttler)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
+ _operation_throttler(std::move(operation_throttler)),
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
@@ -330,6 +332,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
std::lock_guard lockGuard(_mergeStatesLock);
_metrics->pendingMerges.addValue(_mergeStates.size());
_metrics->queueSize.addValue(getQueueSize());
+ _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size());
for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime;
@@ -885,10 +888,39 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_active_operations_stats()
{}
+namespace {
+
+bool
+operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept
+{
+ // Note: SetBucketState is intentionally _not_ included in this set, even though it's
+ // dispatched async. The rationale behind this is that SetBucketState is very cheap
+ // to execute, usually comes in large waves (up to #buckets count) and processing all
+ // requests should complete as quickly as possible. We also don't want such waves to
+ // artificially boost the dynamic throttle window size due to a sudden throughput spike.
+ //
+ // Merge-related operations are transitively throttled by using the operation throttler
+ // directly for all async ops within the MergeHandler.
+ switch (type_id) {
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVELOCATION_ID:
+ case api::MessageType::CREATEBUCKET_ID:
+ case api::MessageType::DELETEBUCKET_ID:
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
+ SharedOperationThrottler::Token throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
@@ -896,15 +928,39 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
for (int attempt = 0; (attempt < 2) && !_owner.isPaused(); ++attempt) {
PriorityIdx& idx(bmi::get<1>(*_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
+ bool was_throttled = false;
- while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
+ while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
iter++;
}
if (iter != end) {
- return getMessage(guard, idx, iter);
+ const bool should_throttle_op = operation_type_should_be_throttled(iter->_command->getType().getId());
+ if (!should_throttle_op && throttle_token.valid()) {
+ throttle_token.reset(); // Let someone else play with it.
+ } else if (should_throttle_op && !throttle_token.valid()) {
+ // Important: _non-blocking_ attempt at getting a throttle token.
+ throttle_token = _owner.operation_throttler().try_acquire_one();
+ was_throttled = !throttle_token.valid();
+ }
+ if (!should_throttle_op || throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
if (attempt == 0) {
- _cond->wait_for(guard, timeout);
+ // Depending on whether we were blocked due to no usable ops in queue or throttling,
+ // wait for either the queue or throttler to (hopefully) have some fresh stuff for us.
+ if (!was_throttled) {
+ _cond->wait_for(guard, timeout);
+ } else {
+ // Have to release lock before doing a blocking throttle token fetch, since it
+ // prevents RPC threads from pushing onto the queue.
+ guard.unlock();
+ throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout);
+ if (!throttle_token.valid()) {
+ return {}; // Already exhausted our timeout window.
+ }
+ guard.lock();
+ }
}
}
return {}; // No message fetched.
@@ -923,14 +979,20 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
++iter;
}
if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) {
- return getMessage(guard, idx, iter);
+ // This is executed in the context of an RPC thread, so only do a _non-blocking_
+ // poll of the throttle policy.
+ auto throttle_token = _owner.operation_throttler().try_acquire_one();
+ if (throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
return {};
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
-
+FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token)
+{
std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
@@ -942,7 +1004,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
msg->getType().getId(), msg->getMsgId(),
msg->lockingRequirements());
guard.unlock();
- return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
+ return {std::move(locker), std::move(msg), std::move(throttle_token)};
} else {
std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
guard.unlock();
@@ -1014,7 +1076,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en
std::unique_lock guard(*_lock);
_queue->emplace_back(std::move(entry));
auto lockedMessage = get_next_async_message(guard);
- if ( ! lockedMessage.second) {
+ if ( ! lockedMessage.msg) {
if (guard.owns_lock()) {
guard.unlock();
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 5d68be8a800..c4b85ac596c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -42,11 +42,12 @@ class AbortBucketOperationsCommand;
namespace bmi = boost::multi_index;
-class FileStorHandlerImpl : private framework::MetricUpdateHook,
- private ResumeGuard::Callback,
- public FileStorHandler {
+class FileStorHandlerImpl final
+ : private framework::MetricUpdateHook,
+ private ResumeGuard::Callback,
+ public FileStorHandler
+{
public:
-
struct MessageEntry {
std::shared_ptr<api::StorageMessage> _command;
metrics::MetricTimer _timer;
@@ -147,7 +148,8 @@ public:
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
- PriorityIdx::iterator iter);
+ PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -163,7 +165,9 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
// TODO refactor, too many params
- BucketLock(const monitor_guard & guard, Stripe& disk, const document::Bucket &bucket,
+ BucketLock(const monitor_guard & guard,
+ Stripe& disk,
+ const document::Bucket &bucket,
uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
api::LockingRequirements lockReq);
~BucketLock() override;
@@ -187,9 +191,9 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&);
+ ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>);
- ~FileStorHandlerImpl();
+ ~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
void flush(bool killPendingMerges) override;
@@ -239,6 +243,10 @@ public:
ResumeGuard pause() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
+ SharedOperationThrottler& operation_throttler() const noexcept override {
+ return *_operation_throttler;
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -249,6 +257,7 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
+ std::unique_ptr<SharedOperationThrottler> _operation_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 2cfb3a2cffe..f5b9da0e1f5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -32,6 +32,7 @@ LOG_SETUP(".persistence.filestor.manager");
using std::shared_ptr;
using document::BucketSpace;
using vespalib::make_string_short::fmt;
+using vespa::config::content::StorFilestorConfig;
namespace {
@@ -130,18 +131,31 @@ uint32_t computeNumResponseThreads(int configured) {
}
vespalib::Executor::OptimizeFor
-selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerType sequencerType) {
+selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
switch (sequencerType) {
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
+ case StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
return vespalib::Executor::OptimizeFor::THROUGHPUT;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::LATENCY:
+ case StorFilestorConfig::ResponseSequencerType::LATENCY:
return vespalib::Executor::OptimizeFor::LATENCY;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
+ case StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
default:
return vespalib::Executor::OptimizeFor::ADAPTIVE;
}
}
+std::unique_ptr<SharedOperationThrottler>
+make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
+{
+ const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
+ if (use_dynamic_throttling) {
+ auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
+ auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
+ return SharedOperationThrottler::make_dynamic_throttler(win_size_increment);
+ } else {
+ return SharedOperationThrottler::make_unlimited_throttler();
+ }
+}
+
#ifdef __PIC__
#define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec")))
#else
@@ -185,7 +199,7 @@ FileStorManager::getThreadLocalHandler() {
* incoming during reconfiguration
*/
void
-FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config)
+FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
{
// If true, this is not the first configure.
bool liveUpdate = ! _threads.empty();
@@ -198,8 +212,10 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numThreads = _config->numThreads;
size_t numStripes = std::max(size_t(1u), numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
+ auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads);
- _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg);
+ _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics,
+ _compReg, std::move(operation_throttler));
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000,
selectSequencer(_config->responseSequencerType));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index c119fdc4f69..a98077da57a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -203,6 +203,7 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str
averageQueueWaitingTime("averagequeuewait.sum", {}, "Average time an operation spends in input queue.", this),
queueSize("queuesize", {}, "Size of input message queue.", this),
pendingMerges("pendingmerge", {}, "Number of buckets currently being merged.", this),
+ throttle_window_size("throttlewindowsize", {}, "Current size of async operation throttler window size", this),
waitingForLockHitRate("waitingforlockrate", {},
"Amount of times a filestor thread has needed to wait for "
"lock to take next message in queue.", this),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 7543e6e0771..d8135c9aeca 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -147,6 +147,7 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongAverageMetric queueSize;
metrics::LongAverageMetric pendingMerges;
+ metrics::LongAverageMetric throttle_window_size;
metrics::DoubleAverageMetric waitingForLockHitRate;
metrics::DoubleAverageMetric lockWaitTime; // unused
ActiveOperationsMetrics active_operations;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index b9739fcf734..7dcf4bcbee2 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -2,6 +2,7 @@
#include "mergehandler.h"
#include "persistenceutil.h"
+#include "shared_operation_throttler.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
@@ -32,6 +33,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _operation_throttler(_env._fileStorHandler.operation_throttler()),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
@@ -514,17 +516,22 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
+ auto throttle_token = _operation_throttler.blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId),
+ std::move(throttle_token), "put",
+ _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
} else {
std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids;
ids.emplace_back(timestamp, e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second,
+ std::move(throttle_token), "remove",
+ _clock, _env._metrics.merge_handler_metrics.remove_latency);
_spi.removeAsync(bucket, std::move(ids), context, std::move(complete));
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f52fe63bc2b..1007f35c241 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -34,6 +34,7 @@ namespace spi {
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
+class SharedOperationThrottler;
class MergeHandler : public Types,
public MergeBucketInfoSyncer {
@@ -52,7 +53,7 @@ public:
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
- ~MergeHandler();
+ ~MergeHandler() override;
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -86,6 +87,7 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 8b546771b71..c0c95ffd7af 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -158,12 +158,13 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
void
PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
- api::StorageMessage & msg(*lock.second);
+ LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get());
+ api::StorageMessage & msg(*lock.msg);
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
- auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second);
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler,
+ std::move(lock.lock), lock.msg, std::move(lock.throttle_token));
tracker = processMessage(msg, std::move(tracker));
if (tracker) {
tracker->sendReply();
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index f9da4d63d7f..499e9807cbf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -38,7 +38,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId));
- if (lock.first) {
+ if (lock.lock) {
_persistenceHandler.processLockedMessage(std::move(lock));
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index cbfc9463a8c..65eab99b8fb 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -31,19 +31,22 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
- : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg))
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
: _sendReply(true),
_updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
_msg(std::move(msg)),
+ _throttle_token(std::move(throttle_token)),
_context(_msg->getPriority(), _msg->getTrace().getLevel()),
_env(env),
_replySender(replySender),
@@ -56,7 +59,8 @@ MessageTracker::UP
MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
- return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg)));
+ return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
+ std::move(msg), SharedOperationThrottler::Token()));
}
void
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 4fd0e60c730..588cbef2170 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -30,7 +30,8 @@ public:
using UP = std::unique_ptr<MessageTracker>;
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
~MessageTracker();
@@ -91,7 +92,8 @@ public:
private:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -99,6 +101,7 @@ private:
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
std::shared_ptr<api::StorageMessage> _msg;
+ SharedOperationThrottler::Token _throttle_token;
spi::Context _context;
const PersistenceUtil &_env;
MessageSender &_replySender;
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
new file mode 100644
index 00000000000..b72b1a8ba28
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
@@ -0,0 +1,191 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_operation_throttler.h"
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/messagebus/message.h>
+#include <condition_variable>
+#include <cassert>
+#include <mutex>
+
+namespace storage {
+
+namespace {
+
+class NoLimitsOperationThrottler final : public SharedOperationThrottler {
+public:
+ ~NoLimitsOperationThrottler() override = default;
+ Token blocking_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token blocking_acquire_one(vespalib::duration) noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token try_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t waiting_threads() const noexcept override { return 0; }
+private:
+ void release_one() noexcept override { /* no-op */ }
+};
+
+// Class used to sneakily get around IThrottlePolicy only accepting MBus objects
+template <typename Base>
+class DummyMbusMessage final : public Base {
+ static const mbus::string NAME;
+public:
+ const mbus::string& getProtocol() const override { return NAME; }
+ uint32_t getType() const override { return 0x1badb007; }
+ uint8_t priority() const override { return 255; }
+};
+
+template <typename Base>
+const mbus::string DummyMbusMessage<Base>::NAME = "FooBar";
+
+class DynamicOperationThrottler final : public SharedOperationThrottler {
+ mutable std::mutex _mutex;
+ std::condition_variable _cond;
+ mbus::DynamicThrottlePolicy _throttle_policy;
+ uint32_t _pending_ops;
+ uint32_t _waiting_threads;
+public:
+ explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment);
+ ~DynamicOperationThrottler() override;
+
+ Token blocking_acquire_one() noexcept override;
+ Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token try_acquire_one() noexcept override;
+ uint32_t current_window_size() const noexcept override;
+ uint32_t waiting_threads() const noexcept override;
+private:
+ void release_one() noexcept override;
+};
+
+DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
+ : _mutex(),
+ _cond(),
+ _throttle_policy(static_cast<double>(min_size_and_window_increment)),
+ _pending_ops(0),
+ _waiting_threads(0)
+{
+}
+
+DynamicOperationThrottler::~DynamicOperationThrottler() = default;
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ _cond.wait(lock, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ if (!accepted) {
+ return Token();
+ }
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::try_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ return Token();
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+void
+DynamicOperationThrottler::release_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Reply> dummy_reply;
+ _throttle_policy.processReply(dummy_reply);
+ assert(_pending_ops > 0);
+ --_pending_ops;
+ if (_waiting_threads > 0) {
+ lock.unlock();
+ _cond.notify_one();
+ }
+}
+
+uint32_t
+DynamicOperationThrottler::current_window_size() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _throttle_policy.getMaxPendingCount(); // Actually returns current window size
+}
+
+uint32_t
+DynamicOperationThrottler::waiting_threads() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _waiting_threads;
+}
+
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_unlimited_throttler()
+{
+ return std::make_unique<NoLimitsOperationThrottler>();
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment)
+{
+ return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment);
+}
+
+DynamicOperationThrottler::Token::~Token()
+{
+ if (_throttler) {
+ _throttler->release_one();
+ }
+}
+
+void
+DynamicOperationThrottler::Token::reset() noexcept
+{
+ if (_throttler) {
+ _throttler->release_one();
+ _throttler = nullptr;
+ }
+}
+
+DynamicOperationThrottler::Token&
+DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
+{
+ reset();
+ _throttler = rhs._throttler;
+ rhs._throttler = nullptr;
+ return *this;
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
new file mode 100644
index 00000000000..2e1de86c4b8
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -0,0 +1,71 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+#include <optional>
+
+namespace storage {
+
+/**
+ * Operation throttler that is intended to provide global throttling of
+ * async operations across all persistence stripe threads. A throttler
+ * wraps a logical max pending window size of in-flight operations. Depending
+ * on the throttler implementation, the window size may expand and shrink
+ * dynamically. Exactly how and when this happens is unspecified.
+ *
+ * Offers both polling and (timed, non-timed) blocking calls for acquiring
+ * a throttle token. If the returned token is valid, the caller may proceed
+ * to invoke the asynchronous operation.
+ *
+ * The window slot taken up by a valid throttle token is implicitly freed up
+ * when the token is destroyed.
+ *
+ * All operations on the throttler are thread safe.
+ */
+class SharedOperationThrottler {
+protected:
+ struct TokenCtorTag {}; // Make available to subclasses for token construction.
+public:
+ class Token {
+ SharedOperationThrottler* _throttler;
+ public:
+ constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
+ constexpr Token() noexcept : _throttler(nullptr) {}
+ constexpr Token(Token&& rhs) noexcept
+ : _throttler(rhs._throttler)
+ {
+ rhs._throttler = nullptr;
+ }
+ Token& operator=(Token&& rhs) noexcept;
+ ~Token();
+
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
+ void reset() noexcept;
+ };
+
+ virtual ~SharedOperationThrottler() = default;
+
+ // All methods are thread safe
+ [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
+
+ // May return 0, in which case the window size is unlimited.
+ [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
+
+ // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
+private:
+ // Exclusively called from a valid Token. Thread safe.
+ virtual void release_one() noexcept = 0;
+};
+
+}