diff options
25 files changed, 633 insertions, 81 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 66700eff3e6..c351e52b557 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -75,3 +75,22 @@ use_async_message_handling_on_schedule bool default=false restart ## the entire resource usage sample is immediately reported to the cluster controller (via host info). ## This config can be live updated (doesn't require restart). resource_usage_reporter_noise_level double default=0.001 + +## Specify throttling used for async persistence operations. This throttling takes place +## before operations are dispatched to Proton and serves as a limiter for how many +## operations may be in flight in Proton's internal queues. +## +## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but +## has near zero overhead and never blocks. +## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window +## is full (if a blocking throttler API call is invoked). +## +async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart + +## Specifies the extent the throttling window is increased by when the async throttle +## policy has decided that more concurrent operations are desirable. Also affects the +## _minimum_ size of the throttling window; its size is implicitly set to max(this config +## value, number of threads). +## +## Only applies if async_operation_throttler_type == DYNAMIC. +async_operation_dynamic_throttling_window_increment int default=20 restart diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index 7b165e11b66..fb8120210c1 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp + shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp index a5dd3d929db..8caa84977ce 100644 --- a/storage/src/tests/persistence/active_operations_stats_test.cpp +++ b/storage/src/tests/persistence/active_operations_stats_test.cpp @@ -96,17 +96,17 @@ ActiveOperationsStatsTest::test_active_operations_stats() auto lock0 = filestorHandler->getNextMessage(stripeId); auto lock1 = filestorHandler->getNextMessage(stripeId); auto lock2 = filestorHandler->getNextMessage(stripeId); - ASSERT_TRUE(lock0.first); - ASSERT_TRUE(lock1.first); - ASSERT_FALSE(lock2.first); + ASSERT_TRUE(lock0.lock); + ASSERT_TRUE(lock1.lock); + ASSERT_FALSE(lock2.lock); auto stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("during"); assert_active_operations_stats(stats, 2, 2, 0); } EXPECT_EQ(3, stats.get_total_size()); - lock0.first.reset(); - lock1.first.reset(); + lock0.lock.reset(); + lock1.lock.reset(); stats = filestorHandler->get_active_operations_stats(false); { SCOPED_TRACE("after"); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index cd496605a6c..939e7ae7b6a 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -516,11 +516,11 @@ TEST_F(FileStorManagerTest, handler_priority) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).second->getPriority()); - ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(45, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(60, filestorHandler.getNextMessage(stripeId).msg->getPriority()); + ASSERT_EQ(75, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } class MessagePusherThread : public document::Runnable { @@ -570,7 +570,7 @@ public: void run() override { while (!_done) { FileStorHandler::LockedMessage msg = _handler.getNextMessage(_threadId); - if (msg.second.get()) { + if (msg.msg.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; std::this_thread::sleep_for(5ms); @@ -641,15 +641,15 @@ TEST_F(FileStorManagerTest, handler_pause) { filestorHandler.schedule(cmd); } - ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(15, filestorHandler.getNextMessage(stripeId).msg->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - ASSERT_EQ(filestorHandler.getNextMessage(stripeId).second.get(), nullptr); + ASSERT_EQ(filestorHandler.getNextMessage(stripeId).msg.get(), nullptr); } - ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).second->getPriority()); + ASSERT_EQ(30, filestorHandler.getNextMessage(stripeId).msg->getPriority()); } TEST_F(FileStorManagerTest, remap_split) { @@ -729,8 +729,8 @@ TEST_F(FileStorManagerTest, handler_timeout) { std::this_thread::sleep_for(51ms); for (;;) { auto lock = filestorHandler.getNextMessage(stripeId); - if (lock.first.get()) { - ASSERT_EQ(200, lock.second->getPriority()); + if (lock.lock.get()) { + ASSERT_EQ(200, lock.msg->getPriority()); break; } } @@ -2013,7 +2013,7 @@ expect_async_message(StorageMessage::Priority exp_pri, { EXPECT_TRUE(result.was_scheduled()); ASSERT_TRUE(result.has_async_message()); - EXPECT_EQ(exp_pri, result.async_message().second->getPriority()); + EXPECT_EQ(exp_pri, result.async_message().msg->getPriority()); } void @@ -2045,8 +2045,8 @@ TEST_F(FileStorHandlerTest, async_message_with_lowest_pri_returned_on_schedule) auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_async_message(20, result); } - EXPECT_EQ(30, get_next_message().second->getPriority()); - EXPECT_EQ(40, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); + EXPECT_EQ(40, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_not_async) @@ -2057,8 +2057,8 @@ TEST_F(FileStorHandlerTest, no_async_message_returned_if_lowest_pri_message_is_n auto result = handler->schedule_and_get_next_async_message(make_put_command(30)); expect_empty_async_message(result); - EXPECT_EQ(20, get_next_message().second->getPriority()); - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(20, get_next_message().msg->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) @@ -2079,7 +2079,7 @@ TEST_F(FileStorHandlerTest, inhibited_operations_are_skipped) expect_async_message(40, result); } } - EXPECT_EQ(30, get_next_message().second->getPriority()); + EXPECT_EQ(30, get_next_message().msg->getPriority()); } } // storage diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 2557fa537f5..5f3836727dd 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -91,14 +91,14 @@ TEST_F(PersistenceQueueTest, fetch_next_unlocked_message_if_bucket_locked) { f.filestorHandler->schedule(createPut(5432, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); + ASSERT_TRUE(lock0.lock.get()); EXPECT_EQ(document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock0.msg).getBucketId()); auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); + ASSERT_TRUE(lock1.lock.get()); EXPECT_EQ(document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + dynamic_cast<api::PutCommand&>(*lock1.msg).getBucketId()); } TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_access) { @@ -108,14 +108,14 @@ TEST_F(PersistenceQueueTest, shared_locked_operations_allow_concurrent_bucket_ac f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Even though we already have a lock on the bucket, Gets allow shared locking and we // should therefore be able to get another lock. auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock1.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock1.first->lockingRequirements()); + ASSERT_TRUE(lock1.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock1.lock->lockingRequirements()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op_active) { @@ -125,12 +125,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_shared_op f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Shared, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Shared, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op_active) { @@ -140,12 +140,12 @@ TEST_F(PersistenceQueueTest, shared_locked_operation_not_started_if_exclusive_op f.filestorHandler->schedule(createGet(1234)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive_op_active) { @@ -155,12 +155,12 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive f.filestorHandler->schedule(createPut(1234, 0)); auto lock0 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_TRUE(lock0.first.get()); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); + ASSERT_TRUE(lock0.lock.get()); + EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.lock->lockingRequirements()); // Expected to time out auto lock1 = f.filestorHandler->getNextMessage(f.stripeId); - ASSERT_FALSE(lock1.first.get()); + ASSERT_FALSE(lock1.lock.get()); } } // namespace storage diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp new file mode 100644 index 00000000000..0ad380937c7 --- /dev/null +++ b/storage/src/tests/persistence/shared_operation_throttler_test.cpp @@ -0,0 +1,116 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storage/persistence/shared_operation_throttler.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/barrier.h> +#include <chrono> +#include <thread> + +using namespace ::testing; + +namespace storage { + +using ThrottleToken = SharedOperationThrottler::Token; + +TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { + // We technically can't test that the unlimited throttler _never_ throttles, but at + // least check that it doesn't throttle _twice_, and then induce from this ;) + auto throttler = SharedOperationThrottler::make_unlimited_throttler(); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->blocking_acquire_one(); + EXPECT_TRUE(token2.valid()); + // Window size should be zero (i.e. unlimited) for unlimited throttler + EXPECT_EQ(throttler->current_window_size(), 0); +} + +TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); + auto token2 = throttler->try_acquire_one(); + EXPECT_FALSE(token2.valid()); + + EXPECT_EQ(throttler->current_window_size(), 1); +} + +TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + token.reset(); + token = throttler->blocking_acquire_one(600s); // Should never block. + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + vespalib::Barrier barrier(2); + std::thread t([&] { + auto token = throttler->try_acquire_one(); + assert(token.valid()); + barrier.await(); + while (throttler->waiting_threads() != 1) { + std::this_thread::sleep_for(100us); + } + // Implicit token release at thread scope exit + }); + barrier.await(); + auto token = throttler->blocking_acquire_one(); + EXPECT_TRUE(token.valid()); + t.join(); +} + +TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto window_filling_token = throttler->try_acquire_one(); + auto before = std::chrono::steady_clock::now(); + // Will block for at least 1ms. Since no window slot will be available by that time, + // an invalid token should be returned. + auto token = throttler->blocking_acquire_one(1ms); + auto after = std::chrono::steady_clock::now(); + EXPECT_TRUE((after - before) >= 1ms); + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { + ThrottleToken token; + EXPECT_FALSE(token.valid()); + token.reset(); // no-op + EXPECT_FALSE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + { + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); + } + auto token = throttler->try_acquire_one(); + EXPECT_TRUE(token.valid()); +} + +TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { + auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); + auto token1 = throttler->try_acquire_one(); + auto token2 = std::move(token1); // move ctor + EXPECT_TRUE(token2.valid()); + EXPECT_FALSE(token1.valid()); + ThrottleToken token3; + token3 = std::move(token2); // move assignment op + EXPECT_TRUE(token3.valid()); + EXPECT_FALSE(token2.valid()); + + // Trying to fetch new token should not succeed due to active token and win size of 1 + token1 = throttler->try_acquire_one(); + EXPECT_FALSE(token1.valid()); + // Resetting the token should free up the slot in the window + token3.reset(); + token1 = throttler->try_acquire_one(); + EXPECT_TRUE(token1.valid()); +} + +// TODO ideally we'd test that the dynamic throttler has a window size that is actually +// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so +// it's not trivial to know how to do this reliably. + +} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index c737d2bed28..5e068236026 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -14,6 +14,7 @@ vespa_add_library(storage_spersistence OBJECT persistenceutil.cpp processallhandler.cpp provider_error_wrapper.cpp + shared_operation_throttler.cpp simplemessagehandler.cpp splitbitdetector.cpp splitjoinhandler.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 2dc5989e857..73ccc7f6085 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -7,10 +7,16 @@ namespace storage { -ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) +ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, + document::DocumentId doc_id, + SharedOperationThrottler::Token throttle_token, + const char *op, + const framework::Clock& clock, + metrics::DoubleAverageMetric& latency_metric) : _result_handler(nullptr), _state(std::move(state)), _doc_id(std::move(doc_id)), + _throttle_token(std::move(throttle_token)), _op(op), _start_time(clock), _latency_metric(latency_metric) @@ -27,6 +33,7 @@ ApplyBucketDiffEntryComplete::onComplete(std::unique_ptr<spi::Result> result) no } double elapsed = _start_time.getElapsedTimeAsDouble(); _latency_metric.addValue(elapsed); + _throttle_token.reset(); _state->on_entry_complete(std::move(result), _doc_id, _op); } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index 1037318aec6..8478cab4c17 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -2,6 +2,7 @@ #pragma once +#include "shared_operation_throttler.h" #include <vespa/document/base/documentid.h> #include <vespa/metrics/valuemetric.h> #include <vespa/persistence/spi/operationcomplete.h> @@ -21,12 +22,17 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete const spi::ResultHandler* _result_handler; std::shared_ptr<ApplyBucketDiffState> _state; document::DocumentId _doc_id; + SharedOperationThrottler::Token _throttle_token; const char* _op; framework::MilliSecTimer _start_time; metrics::DoubleAverageMetric& _latency_metric; public: - ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); - ~ApplyBucketDiffEntryComplete(); + ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, + document::DocumentId doc_id, + SharedOperationThrottler::Token throttle_token, + const char *op, const framework::Clock& clock, + metrics::DoubleAverageMetric& latency_metric); + ~ApplyBucketDiffEntryComplete() override; void onComplete(std::unique_ptr<spi::Result> result) noexcept override; void addResultHandler(const spi::ResultHandler* resultHandler) override; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt index 62d1a80501a..2d137f87118 100644 --- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(storage_filestorpersistence OBJECT SOURCES active_operations_metrics.cpp active_operations_stats.cpp + filestorhandler.cpp filestorhandlerimpl.cpp filestormanager.cpp filestormetrics.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp new file mode 100644 index 00000000000..c066277ec71 --- /dev/null +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -0,0 +1,8 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "filestorhandler.h" + +namespace storage { + +FileStorHandler::LockedMessage::~LockedMessage() = default; + +} diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index a980b5aa2e1..6f740ce2c28 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -16,6 +16,7 @@ #include <vespa/document/bucket/bucket.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> +#include <vespa/storage/persistence/shared_operation_throttler.h> #include <vespa/storageapi/messageapi/storagemessage.h> namespace storage { @@ -74,7 +75,29 @@ public: [[nodiscard]] virtual api::LockingRequirements lockingRequirements() const noexcept = 0; }; - using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>; + struct LockedMessage { + std::shared_ptr<BucketLockInterface> lock; + std::shared_ptr<api::StorageMessage> msg; + SharedOperationThrottler::Token throttle_token; + + LockedMessage() noexcept = default; + LockedMessage(std::shared_ptr<BucketLockInterface> lock_, + std::shared_ptr<api::StorageMessage> msg_) noexcept + : lock(std::move(lock_)), + msg(std::move(msg_)), + throttle_token() + {} + LockedMessage(std::shared_ptr<BucketLockInterface> lock_, + std::shared_ptr<api::StorageMessage> msg_, + SharedOperationThrottler::Token token) noexcept + : lock(std::move(lock_)), + msg(std::move(msg_)), + throttle_token(std::move(token)) + {} + LockedMessage(LockedMessage&&) noexcept = default; + ~LockedMessage(); + }; + class ScheduleAsyncResult { private: bool _was_scheduled; @@ -90,7 +113,7 @@ public: return _was_scheduled; } bool has_async_message() const { - return _async_message.first.get() != nullptr; + return _async_message.lock.get() != nullptr; } const LockedMessage& async_message() const { return _async_message; @@ -250,6 +273,8 @@ public: virtual std::string dumpQueue() const = 0; virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0; + + virtual SharedOperationThrottler& operation_throttler() const noexcept = 0; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index c6991803b4d..5e0ea0359dc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -40,16 +40,18 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) - : FileStorHandlerImpl(1, 1, sender, metrics, compReg) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler()) { } FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, - ServiceLayerComponentRegister& compReg) + ServiceLayerComponentRegister& compReg, + std::unique_ptr<SharedOperationThrottler> operation_throttler) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), _metrics(nullptr), + _operation_throttler(std::move(operation_throttler)), _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), @@ -330,6 +332,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) std::lock_guard lockGuard(_mergeStatesLock); _metrics->pendingMerges.addValue(_mergeStates.size()); _metrics->queueSize.addValue(getQueueSize()); + _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size()); for (const auto & stripe : _metrics->stripes) { const auto & m = stripe->averageQueueWaitingTime; @@ -885,10 +888,39 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe _active_operations_stats() {} +namespace { + +bool +operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept +{ + // Note: SetBucketState is intentionally _not_ included in this set, even though it's + // dispatched async. The rationale behind this is that SetBucketState is very cheap + // to execute, usually comes in large waves (up to #buckets count) and processing all + // requests should complete as quickly as possible. We also don't want such waves to + // artificially boost the dynamic throttle window size due to a sudden throughput spike. + // + // Merge-related operations are transitively throttled by using the operation throttler + // directly for all async ops within the MergeHandler. + switch (type_id) { + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVELOCATION_ID: + case api::MessageType::CREATEBUCKET_ID: + case api::MessageType::DELETEBUCKET_ID: + return true; + default: + return false; + } +} + +} + FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { std::unique_lock guard(*_lock); + SharedOperationThrottler::Token throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register @@ -896,15 +928,39 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) for (int attempt = 0; (attempt < 2) && !_owner.isPaused(); ++attempt) { PriorityIdx& idx(bmi::get<1>(*_queue)); PriorityIdx::iterator iter(idx.begin()), end(idx.end()); + bool was_throttled = false; - while (iter != end && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { + while ((iter != end) && operationIsInhibited(guard, iter->_bucket, *iter->_command)) { iter++; } if (iter != end) { - return getMessage(guard, idx, iter); + const bool should_throttle_op = operation_type_should_be_throttled(iter->_command->getType().getId()); + if (!should_throttle_op && throttle_token.valid()) { + throttle_token.reset(); // Let someone else play with it. + } else if (should_throttle_op && !throttle_token.valid()) { + // Important: _non-blocking_ attempt at getting a throttle token. + throttle_token = _owner.operation_throttler().try_acquire_one(); + was_throttled = !throttle_token.valid(); + } + if (!should_throttle_op || throttle_token.valid()) { + return getMessage(guard, idx, iter, std::move(throttle_token)); + } } if (attempt == 0) { - _cond->wait_for(guard, timeout); + // Depending on whether we were blocked due to no usable ops in queue or throttling, + // wait for either the queue or throttler to (hopefully) have some fresh stuff for us. + if (!was_throttled) { + _cond->wait_for(guard, timeout); + } else { + // Have to release lock before doing a blocking throttle token fetch, since it + // prevents RPC threads from pushing onto the queue. + guard.unlock(); + throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout); + if (!throttle_token.valid()) { + return {}; // Already exhausted our timeout window. + } + guard.lock(); + } } } return {}; // No message fetched. @@ -923,14 +979,20 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) ++iter; } if ((iter != end) && AsyncHandler::is_async_message(iter->_command->getType().getId())) { - return getMessage(guard, idx, iter); + // This is executed in the context of an RPC thread, so only do a _non-blocking_ + // poll of the throttle policy. + auto throttle_token = _owner.operation_throttler().try_acquire_one(); + if (throttle_token.valid()) { + return getMessage(guard, idx, iter, std::move(throttle_token)); + } } return {}; } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { - +FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, + SharedOperationThrottler::Token throttle_token) +{ std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); std::shared_ptr<api::StorageMessage> msg = std::move(iter->_command); @@ -942,7 +1004,7 @@ FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx msg->getType().getId(), msg->getMsgId(), msg->lockingRequirements()); guard.unlock(); - return FileStorHandler::LockedMessage(std::move(locker), std::move(msg)); + return {std::move(locker), std::move(msg), std::move(throttle_token)}; } else { std::shared_ptr<api::StorageReply> msgReply(makeQueueTimeoutReply(*msg)); guard.unlock(); @@ -1014,7 +1076,7 @@ FileStorHandlerImpl::Stripe::schedule_and_get_next_async_message(MessageEntry en std::unique_lock guard(*_lock); _queue->emplace_back(std::move(entry)); auto lockedMessage = get_next_async_message(guard); - if ( ! lockedMessage.second) { + if ( ! lockedMessage.msg) { if (guard.owns_lock()) { guard.unlock(); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 5d68be8a800..c4b85ac596c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -42,11 +42,12 @@ class AbortBucketOperationsCommand; namespace bmi = boost::multi_index; -class FileStorHandlerImpl : private framework::MetricUpdateHook, - private ResumeGuard::Callback, - public FileStorHandler { +class FileStorHandlerImpl final + : private framework::MetricUpdateHook, + private ResumeGuard::Callback, + public FileStorHandler +{ public: - struct MessageEntry { std::shared_ptr<api::StorageMessage> _command; metrics::MetricTimer _timer; @@ -147,7 +148,8 @@ public: // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, - PriorityIdx::iterator iter); + PriorityIdx::iterator iter, + SharedOperationThrottler::Token throttle_token); using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -163,7 +165,9 @@ public: class BucketLock : public FileStorHandler::BucketLockInterface { public: // TODO refactor, too many params - BucketLock(const monitor_guard & guard, Stripe& disk, const document::Bucket &bucket, + BucketLock(const monitor_guard & guard, + Stripe& disk, + const document::Bucket &bucket, uint8_t priority, api::MessageType::Id msgType, api::StorageMessage::Id, api::LockingRequirements lockReq); ~BucketLock() override; @@ -187,9 +191,9 @@ public: FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&); + ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>); - ~FileStorHandlerImpl(); + ~FileStorHandlerImpl() override; void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } void flush(bool killPendingMerges) override; @@ -239,6 +243,10 @@ public: ResumeGuard pause() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; + SharedOperationThrottler& operation_throttler() const noexcept override { + return *_operation_throttler; + } + // Implements ResumeGuard::Callback void resume() override; @@ -249,6 +257,7 @@ private: ServiceLayerComponent _component; std::atomic<DiskState> _state; FileStorDiskMetrics * _metrics; + std::unique_ptr<SharedOperationThrottler> _operation_throttler; std::vector<Stripe> _stripes; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 2cfb3a2cffe..f5b9da0e1f5 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -32,6 +32,7 @@ LOG_SETUP(".persistence.filestor.manager"); using std::shared_ptr; using document::BucketSpace; using vespalib::make_string_short::fmt; +using vespa::config::content::StorFilestorConfig; namespace { @@ -130,18 +131,31 @@ uint32_t computeNumResponseThreads(int configured) { } vespalib::Executor::OptimizeFor -selectSequencer(vespa::config::content::StorFilestorConfig::ResponseSequencerType sequencerType) { +selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) { switch (sequencerType) { - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::THROUGHPUT: + case StorFilestorConfig::ResponseSequencerType::THROUGHPUT: return vespalib::Executor::OptimizeFor::THROUGHPUT; - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::LATENCY: + case StorFilestorConfig::ResponseSequencerType::LATENCY: return vespalib::Executor::OptimizeFor::LATENCY; - case vespa::config::content::StorFilestorConfig::ResponseSequencerType::ADAPTIVE: + case StorFilestorConfig::ResponseSequencerType::ADAPTIVE: default: return vespalib::Executor::OptimizeFor::ADAPTIVE; } } +std::unique_ptr<SharedOperationThrottler> +make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads) +{ + const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC); + if (use_dynamic_throttling) { + auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1); + auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads); + return SharedOperationThrottler::make_dynamic_throttler(win_size_increment); + } else { + return SharedOperationThrottler::make_unlimited_throttler(); + } +} + #ifdef __PIC__ #define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec"))) #else @@ -185,7 +199,7 @@ FileStorManager::getThreadLocalHandler() { * incoming during reconfiguration */ void -FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) +FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) { // If true, this is not the first configure. bool liveUpdate = ! _threads.empty(); @@ -198,8 +212,10 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC size_t numThreads = _config->numThreads; size_t numStripes = std::max(size_t(1u), numThreads / 2); _metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config)); + auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads); - _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg); + _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, + _compReg, std::move(operation_throttler)); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); _sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp index c119fdc4f69..a98077da57a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp @@ -203,6 +203,7 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str averageQueueWaitingTime("averagequeuewait.sum", {}, "Average time an operation spends in input queue.", this), queueSize("queuesize", {}, "Size of input message queue.", this), pendingMerges("pendingmerge", {}, "Number of buckets currently being merged.", this), + throttle_window_size("throttlewindowsize", {}, "Current size of async operation throttler window size", this), waitingForLockHitRate("waitingforlockrate", {}, "Amount of times a filestor thread has needed to wait for " "lock to take next message in queue.", this), diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h index 7543e6e0771..d8135c9aeca 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h @@ -147,6 +147,7 @@ public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongAverageMetric queueSize; metrics::LongAverageMetric pendingMerges; + metrics::LongAverageMetric throttle_window_size; metrics::DoubleAverageMetric waitingForLockHitRate; metrics::DoubleAverageMetric lockWaitTime; // unused ActiveOperationsMetrics active_operations; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index b9739fcf734..7dcf4bcbee2 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -2,6 +2,7 @@ #include "mergehandler.h" #include "persistenceutil.h" +#include "shared_operation_throttler.h" #include "apply_bucket_diff_entry_complete.h" #include "apply_bucket_diff_state.h" #include <vespa/storage/persistence/filestorage/mergestatus.h> @@ -32,6 +33,7 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), + _operation_throttler(_env._fileStorHandler.operation_throttler()), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), @@ -514,17 +516,22 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results spi::Context& context, const document::DocumentTypeRepo& repo) const { + auto throttle_token = _operation_throttler.blocking_acquire_one(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), + std::move(throttle_token), "put", + _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); } else { std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids; ids.emplace_back(timestamp, e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, + std::move(throttle_token), "remove", + _clock, _env._metrics.merge_handler_metrics.remove_latency); _spi.removeAsync(bucket, std::move(ids), context, std::move(complete)); } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index f52fe63bc2b..1007f35c241 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -34,6 +34,7 @@ namespace spi { class PersistenceUtil; class ApplyBucketDiffState; class MergeStatus; +class SharedOperationThrottler; class MergeHandler : public Types, public MergeBucketInfoSyncer { @@ -52,7 +53,7 @@ public: uint32_t commonMergeChainOptimalizationMinimumSize = 64, bool async_apply_bucket_diff = false); - ~MergeHandler(); + ~MergeHandler() override; bool buildBucketInfoList( const spi::Bucket& bucket, @@ -86,6 +87,7 @@ private: const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; + SharedOperationThrottler& _operation_throttler; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 8b546771b71..c0c95ffd7af 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -158,12 +158,13 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP void PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) const { - LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.second.get()); - api::StorageMessage & msg(*lock.second); + LOG(debug, "NodeIndex %d, ptr=%p", _env._nodeIndex, lock.msg.get()); + api::StorageMessage & msg(*lock.msg); // Important: we _copy_ the message shared_ptr instead of moving to ensure that `msg` remains // valid even if the tracker is destroyed by an exception in processMessage(). - auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, std::move(lock.first), lock.second); + auto tracker = std::make_unique<MessageTracker>(framework::MilliSecTimer(_clock), _env, _env._fileStorHandler, + std::move(lock.lock), lock.msg, std::move(lock.throttle_token)); tracker = processMessage(msg, std::move(tracker)); if (tracker) { tracker->sendReply(); diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index f9da4d63d7f..499e9807cbf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -38,7 +38,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId)); - if (lock.first) { + if (lock.lock) { _persistenceHandler.processLockedMessage(std::move(lock)); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index cbfc9463a8c..65eab99b8fb 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -31,19 +31,22 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg) - : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg)) + api::StorageMessage::SP msg, + SharedOperationThrottler::Token throttle_token) + : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, - api::StorageMessage::SP msg) + api::StorageMessage::SP msg, + SharedOperationThrottler::Token throttle_token) : _sendReply(true), _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), _msg(std::move(msg)), + _throttle_token(std::move(throttle_token)), _context(_msg->getPriority(), _msg->getTrace().getLevel()), _env(env), _replySender(replySender), @@ -56,7 +59,8 @@ MessageTracker::UP MessageTracker::createForTesting(const framework::MilliSecTimer & timer, PersistenceUtil &env, MessageSender &replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { - return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), std::move(msg))); + return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), + std::move(msg), SharedOperationThrottler::Token())); } void diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 4fd0e60c730..588cbef2170 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -30,7 +30,8 @@ public: using UP = std::unique_ptr<MessageTracker>; MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + SharedOperationThrottler::Token throttle_token); ~MessageTracker(); @@ -91,7 +92,8 @@ public: private: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, - FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg); + FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, + SharedOperationThrottler::Token throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; @@ -99,6 +101,7 @@ private: bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; std::shared_ptr<api::StorageMessage> _msg; + SharedOperationThrottler::Token _throttle_token; spi::Context _context; const PersistenceUtil &_env; MessageSender &_replySender; diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp new file mode 100644 index 00000000000..b72b1a8ba28 --- /dev/null +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp @@ -0,0 +1,191 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "shared_operation_throttler.h" +#include <vespa/messagebus/dynamicthrottlepolicy.h> +#include <vespa/messagebus/message.h> +#include <condition_variable> +#include <cassert> +#include <mutex> + +namespace storage { + +namespace { + +class NoLimitsOperationThrottler final : public SharedOperationThrottler { +public: + ~NoLimitsOperationThrottler() override = default; + Token blocking_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + Token blocking_acquire_one(vespalib::duration) noexcept override { + return Token(this, TokenCtorTag{}); + } + Token try_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + uint32_t current_window_size() const noexcept override { return 0; } + uint32_t waiting_threads() const noexcept override { return 0; } +private: + void release_one() noexcept override { /* no-op */ } +}; + +// Class used to sneakily get around IThrottlePolicy only accepting MBus objects +template <typename Base> +class DummyMbusMessage final : public Base { + static const mbus::string NAME; +public: + const mbus::string& getProtocol() const override { return NAME; } + uint32_t getType() const override { return 0x1badb007; } + uint8_t priority() const override { return 255; } +}; + +template <typename Base> +const mbus::string DummyMbusMessage<Base>::NAME = "FooBar"; + +class DynamicOperationThrottler final : public SharedOperationThrottler { + mutable std::mutex _mutex; + std::condition_variable _cond; + mbus::DynamicThrottlePolicy _throttle_policy; + uint32_t _pending_ops; + uint32_t _waiting_threads; +public: + explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment); + ~DynamicOperationThrottler() override; + + Token blocking_acquire_one() noexcept override; + Token blocking_acquire_one(vespalib::duration timeout) noexcept override; + Token try_acquire_one() noexcept override; + uint32_t current_window_size() const noexcept override; + uint32_t waiting_threads() const noexcept override; +private: + void release_one() noexcept override; +}; + +DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment) + : _mutex(), + _cond(), + _throttle_policy(static_cast<double>(min_size_and_window_increment)), + _pending_ops(0), + _waiting_threads(0) +{ +} + +DynamicOperationThrottler::~DynamicOperationThrottler() = default; + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + ++_waiting_threads; + _cond.wait(lock, [&] { + return _throttle_policy.canSend(dummy_msg, _pending_ops); + }); + --_waiting_threads; + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + ++_waiting_threads; + const bool accepted = _cond.wait_for(lock, timeout, [&] { + return _throttle_policy.canSend(dummy_msg, _pending_ops); + }); + --_waiting_threads; + if (!accepted) { + return Token(); + } + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::try_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Message> dummy_msg; + if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + return Token(); + } + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; + return Token(this, TokenCtorTag{}); +} + +void +DynamicOperationThrottler::release_one() noexcept +{ + std::unique_lock lock(_mutex); + DummyMbusMessage<mbus::Reply> dummy_reply; + _throttle_policy.processReply(dummy_reply); + assert(_pending_ops > 0); + --_pending_ops; + if (_waiting_threads > 0) { + lock.unlock(); + _cond.notify_one(); + } +} + +uint32_t +DynamicOperationThrottler::current_window_size() const noexcept +{ + std::unique_lock lock(_mutex); + return _throttle_policy.getMaxPendingCount(); // Actually returns current window size +} + +uint32_t +DynamicOperationThrottler::waiting_threads() const noexcept +{ + std::unique_lock lock(_mutex); + return _waiting_threads; +} + +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_unlimited_throttler() +{ + return std::make_unique<NoLimitsOperationThrottler>(); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment) +{ + return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment); +} + +DynamicOperationThrottler::Token::~Token() +{ + if (_throttler) { + _throttler->release_one(); + } +} + +void +DynamicOperationThrottler::Token::reset() noexcept +{ + if (_throttler) { + _throttler->release_one(); + _throttler = nullptr; + } +} + +DynamicOperationThrottler::Token& +DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept +{ + reset(); + _throttler = rhs._throttler; + rhs._throttler = nullptr; + return *this; +} + +} diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h new file mode 100644 index 00000000000..2e1de86c4b8 --- /dev/null +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h @@ -0,0 +1,71 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/time.h> +#include <memory> +#include <optional> + +namespace storage { + +/** + * Operation throttler that is intended to provide global throttling of + * async operations across all persistence stripe threads. A throttler + * wraps a logical max pending window size of in-flight operations. Depending + * on the throttler implementation, the window size may expand and shrink + * dynamically. Exactly how and when this happens is unspecified. + * + * Offers both polling and (timed, non-timed) blocking calls for acquiring + * a throttle token. If the returned token is valid, the caller may proceed + * to invoke the asynchronous operation. + * + * The window slot taken up by a valid throttle token is implicitly freed up + * when the token is destroyed. + * + * All operations on the throttler are thread safe. + */ +class SharedOperationThrottler { +protected: + struct TokenCtorTag {}; // Make available to subclasses for token construction. +public: + class Token { + SharedOperationThrottler* _throttler; + public: + constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} + constexpr Token() noexcept : _throttler(nullptr) {} + constexpr Token(Token&& rhs) noexcept + : _throttler(rhs._throttler) + { + rhs._throttler = nullptr; + } + Token& operator=(Token&& rhs) noexcept; + ~Token(); + + Token(const Token&) = delete; + Token& operator=(const Token&) = delete; + + [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } + void reset() noexcept; + }; + + virtual ~SharedOperationThrottler() = default; + + // All methods are thread safe + [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; + + // May return 0, in which case the window size is unlimited. + [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; + + // Exposed for unit testing only. + [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; + + static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); + + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment); +private: + // Exclusively called from a valid Token. Thread safe. + virtual void release_one() noexcept = 0; +}; + +} |