aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-06 11:49:57 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-10 14:35:01 +0000
commit3f15206f090ce36c08282b8d64d6ff9c15c8fd69 (patch)
treeedc117087a4fac59dc6f507e5c88f305845839ca /storage
parent6fc8a76d2062e6176510804be59faffe2e20662d (diff)
Support dynamic throttling of async persistence operations
Adds an operation throttler that is intended to provide global throttling of async operations across all persistence stripe threads. A throttler wraps a logical max pending window size of in-flight operations. Depending on the throttler implementation, the window size may expand and shrink dynamically. Exactly how and when this happens is unspecified. Commit adds two throttler implementations: * An unlimited throttler that is no-op and never blocks. * A throttler built around the mbus `DynamicThrottlePolicy` and defers all window decisions to it. Current config default is to use the unlimited throttler. Config changes require a process restart. Offers both polling and (timed, non-timed) blocking calls for acquiring a throttle token. If the returned token is valid, the caller may proceed to invoke the asynchronous operation. The window slot taken up by a valid throttle token is implicitly freed up when the token is destroyed.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp10
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp34
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp34
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h29
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp82
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h25
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp11
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h7
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp191
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h71
24 files changed, 614 insertions, 81 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index 7b165e11b66..fb8120210c1 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
+ shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
index a5dd3d929db..8caa84977ce 100644
--- a/storage/src/tests/persistence/active_operations_stats_test.cpp
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats()
auto lock0 = filestorHandler->getNextMessage(stripeId);
auto lock1 = filestorHandler->getNextMessage(stripeId);
auto lock2 = filestorHandler->getNextMessage(stripeId);
- ASSERT_TRUE(lock0.first);
- ASSERT_TRUE(lock1.first);
- ASSERT_FALSE(lock2.first);
+ ASSERT_TRUE(lock0.lock);
+ ASSERT_TRUE(lock1.lock);
+ ASSERT_FALSE(lock2.lock);
auto stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("during");
assert_active_operations_stats(stats, 2, 2, 0);
}
EXPECT_EQ(3, stats.get_total_size());
- lock0.first.reset();
- lock1.first.reset();
+ lock0.lock.reset();
+ lock1.lock.reset();
stats = filestorHandler->get_active_operations_stats(false);
{
SCOPED_TRACE("after");
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index cd496605a6c..939e7ae7b6a 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority());
- ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority());
+ ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
class MessagePusherThread : public document::Runnable {
@@ -570,7 +570,7 @@ public:
void run() override {
while (!_done) {
FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId);
- if (msg.second.get()) {
+ if (msg.msg.get()) {
uint32_t originalConfig = _config.load();
_fetchedCount++;
std::this_thread::sleep_for(5ms);
@@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) {
filestorHandler.schedule(cmd);
}
- ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority());
{
ResumeGuard guard = filestorHandler.pause();
(void)guard;
- ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr);
+ ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr);
}
- ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority());
+ ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority());
}
TEST_F(FileStorManagerTest, remap_split) {
@@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) {
std::this_thread::sleep_for(51ms);
for (;;) {
auto lock = filestorHandler.getNextMessage(stripeId);
- if (lock.first.get()) {
- ASSERT_EQ(200, lock.second->getPriority());
+ if (lock.lock.get()) {
+ ASSERT_EQ(200, lock.msg->getPriority());
break;
}
}
@@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri,
{
EXPECT_TRUE(result.was_scheduled());
ASSERT_TRUE(result.has_async_message());
- EXPECT_EQ(exp_pri, result.async_message().second->getPriority());
+ EXPECT_EQ(exp_pri, result.async_message().msg->getPriority());
}
void
@@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule)
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_async_message(20, result);
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
- EXPECT_EQ(40, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
+ EXPECT_EQ(40, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async)
@@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n
auto result = handler->schedule_and_get_next_async_message(make_put_command(30));
expect_empty_async_message(result);
- EXPECT_EQ(20, get_next_message().second->getPriority());
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(20, get_next_message().msg->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
@@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped)
expect_async_message(40, result);
}
}
- EXPECT_EQ(30, get_next_message().second->getPriority());
+ EXPECT_EQ(30, get_next_message().msg->getPriority());
}
} // storage
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 2557fa537f5..5f3836727dd 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) {
f.filestorHandler->schedule(createPut(5432, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
+ ASSERT_TRUE(lock0.lock.get());
EXPECT_EQ(document::BucketId(16, 1234),
- dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId());
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
+ ASSERT_TRUE(lock1.lock.get());
EXPECT_EQ(document::BucketId(16, 5432),
- dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId());
+ dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId());
}
TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) {
@@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Even though we already have a lock on the bucket, Gets allow shared locking and we
// should therefore be able to get another lock.
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock1.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements());
+ ASSERT_TRUE(lock1.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) {
@@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) {
@@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op
f.filestorHandler->schedule(createGet(1234));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) {
@@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive
f.filestorHandler->schedule(createPut(1234, 0));
auto lock0 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_TRUE(lock0.first.get());
- EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements());
+ ASSERT_TRUE(lock0.lock.get());
+ EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements());
// Expected to time out
auto lock1 = f.filestorHandler->getNextMessage(f.stripeId);
- ASSERT_FALSE(lock1.first.get());
+ ASSERT_FALSE(lock1.lock.get());
}
} // namespace storage
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..0ad380937c7
--- /dev/null
+++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
@@ -0,0 +1,116 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/persistence/shared_operation_throttler.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <chrono>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage {
+
+using ThrottleToken = SharedOperationThrottler::Token;
+
+TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQ(throttler->current_window_size(), 0);
+}
+
+TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQ(throttler->current_window_size(), 1);
+}
+
+TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto window_filling_token = throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ {
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
+ auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
+ auto token1 = throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
+// TODO ideally we'd test that the dynamic throttler has a window size that is actually
+// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
+// it's not trivial to know how to do this reliably.
+
+}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index c737d2bed28..5e068236026 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -14,6 +14,7 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
+ shared_operation_throttler.cpp
simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 2dc5989e857..73ccc7f6085 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -7,10 +7,16 @@
namespace storage {
-ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric)
+ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op,
+ const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric)
: _result_handler(nullptr),
_state(std::move(state)),
_doc_id(std::move(doc_id)),
+ _throttle_token(std::move(throttle_token)),
_op(op),
_start_time(clock),
_latency_metric(latency_metric)
@@ -27,6 +33,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) no
}
double elapsed = _start_time.getElapsedTimeAsDouble();
_latency_metric.addValue(elapsed);
+ _throttle_token.reset();
_state->on_entry_complete(std::move(result), _doc_id, _op);
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 1037318aec6..8478cab4c17 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -2,6 +2,7 @@
#pragma once
+#include "shared_operation_throttler.h"
#include <vespa/document/base/documentid.h>
#include <vespa/metrics/valuemetric.h>
#include <vespa/persistence/spi/operationcomplete.h>
@@ -21,12 +22,17 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
const spi::ResultHandler* _result_handler;
std::shared_ptr<ApplyBucketDiffState> _state;
document::DocumentId _doc_id;
+ SharedOperationThrottler::Token _throttle_token;
const char* _op;
framework::MilliSecTimer _start_time;
metrics::DoubleAverageMetric& _latency_metric;
public:
- ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric);
- ~ApplyBucketDiffEntryComplete();
+ ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
+ document::DocumentId doc_id,
+ SharedOperationThrottler::Token throttle_token,
+ const char *op, const framework::Clock& clock,
+ metrics::DoubleAverageMetric& latency_metric);
+ ~ApplyBucketDiffEntryComplete() override;
void onComplete(std::unique_ptr<spi::Result> result) noexcept override;
void addResultHandler(const spi::ResultHandler* resultHandler) override;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index 62d1a80501a..2d137f87118 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(storage_filestorpersistence OBJECT
SOURCES
active_operations_metrics.cpp
active_operations_stats.cpp
+ filestorhandler.cpp
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
new file mode 100644
index 00000000000..c066277ec71
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
@@ -0,0 +1,8 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "filestorhandler.h"
+
+namespace storage {
+
+FileStorHandler::LockedMessage::~LockedMessage() = default;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index a980b5aa2e1..6f740ce2c28 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -16,6 +16,7 @@
#include <vespa/document/bucket/bucket.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/persistence/shared_operation_throttler.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
namespace storage {
@@ -74,7 +75,29 @@ public:
[[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
};
- using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
+ struct LockedMessage {
+ std::shared_ptr<BucketLockInterface> lock;
+ std::shared_ptr<api::StorageMessage> msg;
+ SharedOperationThrottler::Token throttle_token;
+
+ LockedMessage() noexcept = default;
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token()
+ {}
+ LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
+ std::shared_ptr<api::StorageMessage> msg_,
+ SharedOperationThrottler::Token token) noexcept
+ : lock(std::move(lock_)),
+ msg(std::move(msg_)),
+ throttle_token(std::move(token))
+ {}
+ LockedMessage(LockedMessage&&) noexcept = default;
+ ~LockedMessage();
+ };
+
class ScheduleAsyncResult {
private:
bool _was_scheduled;
@@ -90,7 +113,7 @@ public:
return _was_scheduled;
}
bool has_async_message() const {
- return _async_message.first.get() != nullptr;
+ return _async_message.lock.get() != nullptr;
}
const LockedMessage& async_message() const {
return _async_message;
@@ -250,6 +273,8 @@ public:
virtual std::string dumpQueue() const = 0;
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
+
+ virtual SharedOperationThrottler& operation_throttler() const noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index c6991803b4d..5e0ea0359dc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,16 +40,18 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg)
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
- ServiceLayerComponentRegister& compReg)
+ ServiceLayerComponentRegister& compReg,
+ std::unique_ptr<SharedOperationThrottler> operation_throttler)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
+ _operation_throttler(std::move(operation_throttler)),
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
@@ -330,6 +332,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
std::lock_guard lockGuard(_mergeStatesLock);
_metrics->pendingMerges.addValue(_mergeStates.size());
_metrics->queueSize.addValue(getQueueSize());
+ _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size());
for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime;
@@ -885,10 +888,39 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_active_operations_stats()
{}
+namespace {
+
+bool
+operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept
+{
+ // Note: SetBucketState is intentionally _not_ included in this set, even though it's
+ // dispatched async. The rationale behind this is that SetBucketState is very cheap
+ // to execute, usually comes in large waves (up to #buckets count) and processing all
+ // requests should complete as quickly as possible. We also don't want such waves to
+ // artificially boost the dynamic throttle window size due to a sudden throughput spike.
+ //
+ // Merge-related operations are transitively throttled by using the operation throttler
+ // directly for all async ops within the MergeHandler.
+ switch (type_id) {
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::REMOVELOCATION_ID:
+ case api::MessageType::CREATEBUCKET_ID:
+ case api::MessageType::DELETEBUCKET_ID:
+ return true;
+ default:
+ return false;
+ }
+}
+
+}
+
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
+ SharedOperationThrottler::Token throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
@@ -896,15 +928,39 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
for (int attempt = 0; (attempt < 2) && !_owner.isPaused(); ++attempt) {
PriorityIdx& idx(bmi::get<1>(*_queue));
PriorityIdx::iterator iter(idx.begin()), end(idx.end());
+ bool was_throttled = false;
- while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
+ while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) {
iter++;
}
if (iter != end) {
- return getMessage(guard, idx, iter);
+ const bool should_throttle_op = operation_type_should_be_throttled(iter->_command->getType().getId());
+ if (!should_throttle_op && throttle_token.valid()) {
+ throttle_token.reset(); // Let someone else play with it.
+ } else if (should_throttle_op && !throttle_token.valid()) {
+ // Important: _non-blocking_ attempt at getting a throttle token.
+ throttle_token = _owner.operation_throttler().try_acquire_one();
+ was_throttled = !throttle_token.valid();
+ }
+ if (!should_throttle_op || throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
if (attempt == 0) {
- _cond->wait_for(guard, timeout);
+ // Depending on whether we were blocked due to no usable ops in queue or throttling,
+ // wait for either the queue or throttler to (hopefully) have some fresh stuff for us.
+ if (!was_throttled) {
+ _cond->wait_for(guard, timeout);
+ } else {
+ // Have to release lock before doing a blocking throttle token fetch, since it
+ // prevents RPC threads from pushing onto the queue.
+ guard.unlock();
+ throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout);
+ if (!throttle_token.valid()) {
+ return {}; // Already exhausted our timeout window.
+ }
+ guard.lock();
+ }
}
}
return {}; // No message fetched.
@@ -923,14 +979,20 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
++iter;
}
if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) {
- return getMessage(guard, idx, iter);
+ // This is executed in the context of an RPC thread, so only do a _non-blocking_
+ // poll of the throttle policy.
+ auto throttle_token = _owner.operation_throttler().try_acquire_one();
+ if (throttle_token.valid()) {
+ return getMessage(guard, idx, iter, std::move(throttle_token));
+ }
}
return {};
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) {
-
+FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token)
+{
std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command);
@@ -942,7 +1004,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx
msg->getType().getId(), msg->getMsgId(),
msg->lockingRequirements());
guard.unlock();
- return FileStorHandler::LockedMessage(std::move(locker), std::move(msg));
+ return {std::move(locker), std::move(msg), std::move(throttle_token)};
} else {
std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg));
guard.unlock();
@@ -1014,7 +1076,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en
std::unique_lock guard(*_lock);
_queue->emplace_back(std::move(entry));
auto lockedMessage = get_next_async_message(guard);
- if ( ! lockedMessage.second) {
+ if ( ! lockedMessage.msg) {
if (guard.owns_lock()) {
guard.unlock();
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 5d68be8a800..c4b85ac596c 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -42,11 +42,12 @@ class AbortBucketOperationsCommand;
namespace bmi = boost::multi_index;
-class FileStorHandlerImpl : private framework::MetricUpdateHook,
- private ResumeGuard::Callback,
- public FileStorHandler {
+class FileStorHandlerImpl final
+ : private framework::MetricUpdateHook,
+ private ResumeGuard::Callback,
+ public FileStorHandler
+{
public:
-
struct MessageEntry {
std::shared_ptr<api::StorageMessage> _command;
metrics::MetricTimer _timer;
@@ -147,7 +148,8 @@ public:
// Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts
// with its locking requirements.
FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
- PriorityIdx::iterator iter);
+ PriorityIdx::iterator iter,
+ SharedOperationThrottler::Token throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -163,7 +165,9 @@ public:
class BucketLock : public FileStorHandler::BucketLockInterface {
public:
// TODO refactor, too many params
- BucketLock(const monitor_guard & guard, Stripe& disk, const document::Bucket &bucket,
+ BucketLock(const monitor_guard & guard,
+ Stripe& disk,
+ const document::Bucket &bucket,
uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id,
api::LockingRequirements lockReq);
~BucketLock() override;
@@ -187,9 +191,9 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&);
+ ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>);
- ~FileStorHandlerImpl();
+ ~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
void flush(bool killPendingMerges) override;
@@ -239,6 +243,10 @@ public:
ResumeGuard pause() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
+ SharedOperationThrottler& operation_throttler() const noexcept override {
+ return *_operation_throttler;
+ }
+
// Implements ResumeGuard::Callback
void resume() override;
@@ -249,6 +257,7 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
+ std::unique_ptr<SharedOperationThrottler> _operation_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 2cfb3a2cffe..f5b9da0e1f5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -32,6 +32,7 @@ LOG_SETUP(".persistence.filestor.manager");
using std::shared_ptr;
using document::BucketSpace;
using vespalib::make_string_short::fmt;
+using vespa::config::content::StorFilestorConfig;
namespace {
@@ -130,18 +131,31 @@ uint32_t computeNumResponseThreads(int configured) {
}
vespalib::Executor::OptimizeFor
-selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerType sequencerType) {
+selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
switch (sequencerType) {
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
+ case StorFilestorConfig::ResponseSequencerType::THROUGHPUT:
return vespalib::Executor::OptimizeFor::THROUGHPUT;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::LATENCY:
+ case StorFilestorConfig::ResponseSequencerType::LATENCY:
return vespalib::Executor::OptimizeFor::LATENCY;
- case vespa::config::content::StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
+ case StorFilestorConfig::ResponseSequencerType::ADAPTIVE:
default:
return vespalib::Executor::OptimizeFor::ADAPTIVE;
}
}
+std::unique_ptr<SharedOperationThrottler>
+make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
+{
+ const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
+ if (use_dynamic_throttling) {
+ auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
+ auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
+ return SharedOperationThrottler::make_dynamic_throttler(win_size_increment);
+ } else {
+ return SharedOperationThrottler::make_unlimited_throttler();
+ }
+}
+
#ifdef __PIC__
#define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec")))
#else
@@ -185,7 +199,7 @@ FileStorManager::getThreadLocalHandler() {
* incoming during reconfiguration
*/
void
-FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config)
+FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
{
// If true, this is not the first configure.
bool liveUpdate = ! _threads.empty();
@@ -198,8 +212,10 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numThreads = _config->numThreads;
size_t numStripes = std::max(size_t(1u), numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
+ auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads);
- _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg);
+ _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics,
+ _compReg, std::move(operation_throttler));
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000,
selectSequencer(_config->responseSequencerType));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index c119fdc4f69..a98077da57a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -203,6 +203,7 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str
averageQueueWaitingTime("averagequeuewait.sum", {}, "Average time an operation spends in input queue.", this),
queueSize("queuesize", {}, "Size of input message queue.", this),
pendingMerges("pendingmerge", {}, "Number of buckets currently being merged.", this),
+ throttle_window_size("throttlewindowsize", {}, "Current size of async operation throttler window size", this),
waitingForLockHitRate("waitingforlockrate", {},
"Amount of times a filestor thread has needed to wait for "
"lock to take next message in queue.", this),
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 7543e6e0771..d8135c9aeca 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -147,6 +147,7 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongAverageMetric queueSize;
metrics::LongAverageMetric pendingMerges;
+ metrics::LongAverageMetric throttle_window_size;
metrics::DoubleAverageMetric waitingForLockHitRate;
metrics::DoubleAverageMetric lockWaitTime; // unused
ActiveOperationsMetrics active_operations;
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index b9739fcf734..7dcf4bcbee2 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -2,6 +2,7 @@
#include "mergehandler.h"
#include "persistenceutil.h"
+#include "shared_operation_throttler.h"
#include "apply_bucket_diff_entry_complete.h"
#include "apply_bucket_diff_state.h"
#include <vespa/storage/persistence/filestorage/mergestatus.h>
@@ -32,6 +33,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
+ _operation_throttler(_env._fileStorHandler.operation_throttler()),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
@@ -514,17 +516,22 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
+ auto throttle_token = _operation_throttler.blocking_acquire_one();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId),
+ std::move(throttle_token), "put",
+ _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
} else {
std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids;
ids.emplace_back(timestamp, e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second,
+ std::move(throttle_token), "remove",
+ _clock, _env._metrics.merge_handler_metrics.remove_latency);
_spi.removeAsync(bucket, std::move(ids), context, std::move(complete));
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index f52fe63bc2b..1007f35c241 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -34,6 +34,7 @@ namespace spi {
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
+class SharedOperationThrottler;
class MergeHandler : public Types,
public MergeBucketInfoSyncer {
@@ -52,7 +53,7 @@ public:
uint32_t commonMergeChainOptimalizationMinimumSize = 64,
bool async_apply_bucket_diff = false);
- ~MergeHandler();
+ ~MergeHandler() override;
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -86,6 +87,7 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
+ SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 8b546771b71..c0c95ffd7af 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -158,12 +158,13 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
void
PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const {
- LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get());
- api::StorageMessage & msg(*lock.second);
+ LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get());
+ api::StorageMessage & msg(*lock.msg);
// Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains
// valid even if the tracker is destroyed by an exception in processMessage().
- auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second);
+ auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler,
+ std::move(lock.lock), lock.msg, std::move(lock.throttle_token));
tracker = processMessage(msg, std::move(tracker));
if (tracker) {
tracker->sendReply();
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index f9da4d63d7f..499e9807cbf 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -38,7 +38,7 @@ PersistenceThread::run(framework::ThreadHandle& thread)
FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId));
- if (lock.first) {
+ if (lock.lock) {
_persistenceHandler.processLockedMessage(std::move(lock));
}
}
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index cbfc9463a8c..65eab99b8fb 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -31,19 +31,22 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
- : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg))
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
+ : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
const PersistenceUtil & env,
MessageSender & replySender,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
- api::StorageMessage::SP msg)
+ api::StorageMessage::SP msg,
+ SharedOperationThrottler::Token throttle_token)
: _sendReply(true),
_updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
_msg(std::move(msg)),
+ _throttle_token(std::move(throttle_token)),
_context(_msg->getPriority(), _msg->getTrace().getLevel()),
_env(env),
_replySender(replySender),
@@ -56,7 +59,8 @@ MessageTracker::UP
MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
- return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg)));
+ return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
+ std::move(msg), SharedOperationThrottler::Token()));
}
void
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 4fd0e60c730..588cbef2170 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -30,7 +30,8 @@ public:
using UP = std::unique_ptr<MessageTracker>;
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
~MessageTracker();
@@ -91,7 +92,8 @@ public:
private:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
- FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg);
+ FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
+ SharedOperationThrottler::Token throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -99,6 +101,7 @@ private:
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
std::shared_ptr<api::StorageMessage> _msg;
+ SharedOperationThrottler::Token _throttle_token;
spi::Context _context;
const PersistenceUtil &_env;
MessageSender &_replySender;
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
new file mode 100644
index 00000000000..b72b1a8ba28
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
@@ -0,0 +1,191 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_operation_throttler.h"
+#include <vespa/messagebus/dynamicthrottlepolicy.h>
+#include <vespa/messagebus/message.h>
+#include <condition_variable>
+#include <cassert>
+#include <mutex>
+
+namespace storage {
+
+namespace {
+
+class NoLimitsOperationThrottler final : public SharedOperationThrottler {
+public:
+ ~NoLimitsOperationThrottler() override = default;
+ Token blocking_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token blocking_acquire_one(vespalib::duration) noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token try_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t waiting_threads() const noexcept override { return 0; }
+private:
+ void release_one() noexcept override { /* no-op */ }
+};
+
+// Class used to sneakily get around IThrottlePolicy only accepting MBus objects
+template <typename Base>
+class DummyMbusMessage final : public Base {
+ static const mbus::string NAME;
+public:
+ const mbus::string& getProtocol() const override { return NAME; }
+ uint32_t getType() const override { return 0x1badb007; }
+ uint8_t priority() const override { return 255; }
+};
+
+template <typename Base>
+const mbus::string DummyMbusMessage<Base>::NAME = "FooBar";
+
+class DynamicOperationThrottler final : public SharedOperationThrottler {
+ mutable std::mutex _mutex;
+ std::condition_variable _cond;
+ mbus::DynamicThrottlePolicy _throttle_policy;
+ uint32_t _pending_ops;
+ uint32_t _waiting_threads;
+public:
+ explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment);
+ ~DynamicOperationThrottler() override;
+
+ Token blocking_acquire_one() noexcept override;
+ Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token try_acquire_one() noexcept override;
+ uint32_t current_window_size() const noexcept override;
+ uint32_t waiting_threads() const noexcept override;
+private:
+ void release_one() noexcept override;
+};
+
+DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
+ : _mutex(),
+ _cond(),
+ _throttle_policy(static_cast<double>(min_size_and_window_increment)),
+ _pending_ops(0),
+ _waiting_threads(0)
+{
+}
+
+DynamicOperationThrottler::~DynamicOperationThrottler() = default;
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ _cond.wait(lock, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ ++_waiting_threads;
+ const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ });
+ --_waiting_threads;
+ if (!accepted) {
+ return Token();
+ }
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::try_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ return Token();
+ }
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+ return Token(this, TokenCtorTag{});
+}
+
+void
+DynamicOperationThrottler::release_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ DummyMbusMessage<mbus::Reply> dummy_reply;
+ _throttle_policy.processReply(dummy_reply);
+ assert(_pending_ops > 0);
+ --_pending_ops;
+ if (_waiting_threads > 0) {
+ lock.unlock();
+ _cond.notify_one();
+ }
+}
+
+uint32_t
+DynamicOperationThrottler::current_window_size() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _throttle_policy.getMaxPendingCount(); // Actually returns current window size
+}
+
+uint32_t
+DynamicOperationThrottler::waiting_threads() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _waiting_threads;
+}
+
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_unlimited_throttler()
+{
+ return std::make_unique<NoLimitsOperationThrottler>();
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment)
+{
+ return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment);
+}
+
+DynamicOperationThrottler::Token::~Token()
+{
+ if (_throttler) {
+ _throttler->release_one();
+ }
+}
+
+void
+DynamicOperationThrottler::Token::reset() noexcept
+{
+ if (_throttler) {
+ _throttler->release_one();
+ _throttler = nullptr;
+ }
+}
+
+DynamicOperationThrottler::Token&
+DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
+{
+ reset();
+ _throttler = rhs._throttler;
+ rhs._throttler = nullptr;
+ return *this;
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
new file mode 100644
index 00000000000..2e1de86c4b8
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -0,0 +1,71 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/time.h>
+#include <memory>
+#include <optional>
+
+namespace storage {
+
+/**
+ * Operation throttler that is intended to provide global throttling of
+ * async operations across all persistence stripe threads. A throttler
+ * wraps a logical max pending window size of in-flight operations. Depending
+ * on the throttler implementation, the window size may expand and shrink
+ * dynamically. Exactly how and when this happens is unspecified.
+ *
+ * Offers both polling and (timed, non-timed) blocking calls for acquiring
+ * a throttle token. If the returned token is valid, the caller may proceed
+ * to invoke the asynchronous operation.
+ *
+ * The window slot taken up by a valid throttle token is implicitly freed up
+ * when the token is destroyed.
+ *
+ * All operations on the throttler are thread safe.
+ */
+class SharedOperationThrottler {
+protected:
+ struct TokenCtorTag {}; // Make available to subclasses for token construction.
+public:
+ class Token {
+ SharedOperationThrottler* _throttler;
+ public:
+ constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
+ constexpr Token() noexcept : _throttler(nullptr) {}
+ constexpr Token(Token&& rhs) noexcept
+ : _throttler(rhs._throttler)
+ {
+ rhs._throttler = nullptr;
+ }
+ Token& operator=(Token&& rhs) noexcept;
+ ~Token();
+
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
+ void reset() noexcept;
+ };
+
+ virtual ~SharedOperationThrottler() = default;
+
+ // All methods are thread safe
+ [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
+
+ // May return 0, in which case the window size is unlimited.
+ [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
+
+ // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
+private:
+ // Exclusively called from a valid Token. Thread safe.
+ virtual void release_one() noexcept = 0;
+};
+
+}