diff options
Diffstat (limited to 'storage/src')
14 files changed, 33 insertions, 409 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt index fb8120210c1..7b165e11b66 100644 --- a/storage/src/tests/persistence/CMakeLists.txt +++ b/storage/src/tests/persistence/CMakeLists.txt @@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST persistencethread_splittest.cpp processalltest.cpp provider_error_wrapper_test.cpp - shared_operation_throttler_test.cpp splitbitdetectortest.cpp testandsettest.cpp gtest_runner.cpp diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp deleted file mode 100644 index 0ad380937c7..00000000000 --- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp +++ /dev/null @@ -1,116 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/shared_operation_throttler.h> -#include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/barrier.h> -#include <chrono> -#include <thread> - -using namespace ::testing; - -namespace storage { - -using ThrottleToken = SharedOperationThrottler::Token; - -TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) { - // We technically can't test that the unlimited throttler _never_ throttles, but at - // least check that it doesn't throttle _twice_, and then induce from this ;) - auto throttler = SharedOperationThrottler::make_unlimited_throttler(); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->blocking_acquire_one(); - EXPECT_TRUE(token2.valid()); - // Window size should be zero (i.e. unlimited) for unlimited throttler - EXPECT_EQ(throttler->current_window_size(), 0); -} - -TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); - auto token2 = throttler->try_acquire_one(); - EXPECT_FALSE(token2.valid()); - - EXPECT_EQ(throttler->current_window_size(), 1); -} - -TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - token.reset(); - token = throttler->blocking_acquire_one(600s); // Should never block. - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - vespalib::Barrier barrier(2); - std::thread t([&] { - auto token = throttler->try_acquire_one(); - assert(token.valid()); - barrier.await(); - while (throttler->waiting_threads() != 1) { - std::this_thread::sleep_for(100us); - } - // Implicit token release at thread scope exit - }); - barrier.await(); - auto token = throttler->blocking_acquire_one(); - EXPECT_TRUE(token.valid()); - t.join(); -} - -TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto window_filling_token = throttler->try_acquire_one(); - auto before = std::chrono::steady_clock::now(); - // Will block for at least 1ms. Since no window slot will be available by that time, - // an invalid token should be returned. - auto token = throttler->blocking_acquire_one(1ms); - auto after = std::chrono::steady_clock::now(); - EXPECT_TRUE((after - before) >= 1ms); - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) { - ThrottleToken token; - EXPECT_FALSE(token.valid()); - token.reset(); // no-op - EXPECT_FALSE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - { - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); - } - auto token = throttler->try_acquire_one(); - EXPECT_TRUE(token.valid()); -} - -TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) { - auto throttler = SharedOperationThrottler::make_dynamic_throttler(1); - auto token1 = throttler->try_acquire_one(); - auto token2 = std::move(token1); // move ctor - EXPECT_TRUE(token2.valid()); - EXPECT_FALSE(token1.valid()); - ThrottleToken token3; - token3 = std::move(token2); // move assignment op - EXPECT_TRUE(token3.valid()); - EXPECT_FALSE(token2.valid()); - - // Trying to fetch new token should not succeed due to active token and win size of 1 - token1 = throttler->try_acquire_one(); - EXPECT_FALSE(token1.valid()); - // Resetting the token should free up the slot in the window - token3.reset(); - token1 = throttler->try_acquire_one(); - EXPECT_TRUE(token1.valid()); -} - -// TODO ideally we'd test that the dynamic throttler has a window size that is actually -// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so -// it's not trivial to know how to do this reliably. - -} diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt index 5e068236026..c737d2bed28 100644 --- a/storage/src/vespa/storage/persistence/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/CMakeLists.txt @@ -14,7 +14,6 @@ vespa_add_library(storage_spersistence OBJECT persistenceutil.cpp processallhandler.cpp provider_error_wrapper.cpp - shared_operation_throttler.cpp simplemessagehandler.cpp splitbitdetector.cpp splitjoinhandler.cpp diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp index 73ccc7f6085..69d35253aa6 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp @@ -9,7 +9,7 @@ namespace storage { ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, - SharedOperationThrottler::Token throttle_token, + ThrottleToken throttle_token, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric) diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h index 8478cab4c17..a78fbe38ae5 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h @@ -22,14 +22,14 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete const spi::ResultHandler* _result_handler; std::shared_ptr<ApplyBucketDiffState> _state; document::DocumentId _doc_id; - SharedOperationThrottler::Token _throttle_token; + ThrottleToken _throttle_token; const char* _op; framework::MilliSecTimer _start_time; metrics::DoubleAverageMetric& _latency_metric; public: ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state, document::DocumentId doc_id, - SharedOperationThrottler::Token throttle_token, + ThrottleToken throttle_token, const char *op, const framework::Clock& clock, metrics::DoubleAverageMetric& latency_metric); ~ApplyBucketDiffEntryComplete() override; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 6f740ce2c28..66dc7126058 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -78,7 +78,7 @@ public: struct LockedMessage { std::shared_ptr<BucketLockInterface> lock; std::shared_ptr<api::StorageMessage> msg; - SharedOperationThrottler::Token throttle_token; + ThrottleToken throttle_token; LockedMessage() noexcept = default; LockedMessage(std::shared_ptr<BucketLockInterface> lock_, @@ -89,7 +89,7 @@ public: {} LockedMessage(std::shared_ptr<BucketLockInterface> lock_, std::shared_ptr<api::StorageMessage> msg_, - SharedOperationThrottler::Token token) noexcept + ThrottleToken token) noexcept : lock(std::move(lock_)), msg(std::move(msg_)), throttle_token(std::move(token)) @@ -274,7 +274,7 @@ public: virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0; - virtual SharedOperationThrottler& operation_throttler() const noexcept = 0; + virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 2ccbc7a85ef..f7d4f750884 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -40,14 +40,14 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) - : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler()) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler()) { } FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg, - std::unique_ptr<SharedOperationThrottler> operation_throttler) + std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), _metrics(nullptr), @@ -920,7 +920,7 @@ FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { std::unique_lock guard(*_lock); - SharedOperationThrottler::Token throttle_token; + ThrottleToken throttle_token; // Try to grab a message+lock, immediately retrying once after a wait // if none can be found and then exiting if the same is the case on the // second attempt. This is key to allowing the run loop to register @@ -997,7 +997,7 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard) FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) { std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime))); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index c4b85ac596c..698f52359f5 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -149,7 +149,7 @@ public: // with its locking requirements. FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>; const FileStorHandlerImpl &_owner; MessageSender &_messageSender; @@ -191,7 +191,7 @@ public: FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>); + ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>); ~FileStorHandlerImpl() override; void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } @@ -243,7 +243,7 @@ public: ResumeGuard pause() override; void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; - SharedOperationThrottler& operation_throttler() const noexcept override { + vespalib::SharedOperationThrottler& operation_throttler() const noexcept override { return *_operation_throttler; } @@ -257,7 +257,7 @@ private: ServiceLayerComponent _component; std::atomic<DiskState> _state; FileStorDiskMetrics * _metrics; - std::unique_ptr<SharedOperationThrottler> _operation_throttler; + std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler; std::vector<Stripe> _stripes; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 530d96dfe3f..03b16e75297 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -143,16 +143,19 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) { } } -std::unique_ptr<SharedOperationThrottler> +std::unique_ptr<vespalib::SharedOperationThrottler> make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads) { const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC); if (use_dynamic_throttling) { auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1); auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads); - return SharedOperationThrottler::make_dynamic_throttler(win_size_increment); + vespalib::SharedOperationThrottler::DynamicThrottleParams params; + params.window_size_increment = win_size_increment; + params.min_window_size = win_size_increment; + return vespalib::SharedOperationThrottler::make_dynamic_throttler(params); } else { - return SharedOperationThrottler::make_unlimited_throttler(); + return vespalib::SharedOperationThrottler::make_unlimited_throttler(); } } diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index e1c821aab48..ee6eed63eb5 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -22,7 +22,10 @@ #include <vespa/vespalib/util/monitored_refcount.h> #include <atomic> -namespace vespalib { class ISequencedTaskExecutor; } +namespace vespalib { +class ISequencedTaskExecutor; +class SharedOperationThrottler; +} namespace storage { @@ -34,7 +37,6 @@ namespace spi { class PersistenceUtil; class ApplyBucketDiffState; class MergeStatus; -class SharedOperationThrottler; class MergeHandler : public Types, public MergeBucketInfoSyncer { @@ -85,7 +87,7 @@ private: const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; - SharedOperationThrottler& _operation_throttler; + vespalib::SharedOperationThrottler& _operation_throttler; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 65eab99b8fb..2781cc61b83 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -32,7 +32,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) : MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token)) {} MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, @@ -41,7 +41,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg, - SharedOperationThrottler::Token throttle_token) + ThrottleToken throttle_token) : _sendReply(true), _updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())), _bucketLock(std::move(bucketLock)), @@ -60,7 +60,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg) { return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock), - std::move(msg), SharedOperationThrottler::Token())); + std::move(msg), ThrottleToken())); } void diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 588cbef2170..4130a276239 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -31,7 +31,7 @@ public: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); ~MessageTracker(); @@ -93,7 +93,7 @@ public: private: MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo, FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg, - SharedOperationThrottler::Token throttle_token); + ThrottleToken throttle_token); [[nodiscard]] bool count_result_as_failure() const noexcept; @@ -101,7 +101,7 @@ private: bool _updateBucketInfo; FileStorHandler::BucketLockInterface::SP _bucketLock; std::shared_ptr<api::StorageMessage> _msg; - SharedOperationThrottler::Token _throttle_token; + ThrottleToken _throttle_token; spi::Context _context; const PersistenceUtil &_env; MessageSender &_replySender; diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp deleted file mode 100644 index 7db1a0ccdbb..00000000000 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp +++ /dev/null @@ -1,201 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "shared_operation_throttler.h" -#include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/storage/common/dummy_mbus_messages.h> -#include <condition_variable> -#include <cassert> -#include <mutex> - -namespace storage { - -namespace { - -class NoLimitsOperationThrottler final : public SharedOperationThrottler { -public: - ~NoLimitsOperationThrottler() override = default; - Token blocking_acquire_one() noexcept override { - return Token(this, TokenCtorTag{}); - } - Token blocking_acquire_one(vespalib::duration) noexcept override { - return Token(this, TokenCtorTag{}); - } - Token try_acquire_one() noexcept override { - return Token(this, TokenCtorTag{}); - } - uint32_t current_window_size() const noexcept override { return 0; } - uint32_t waiting_threads() const noexcept override { return 0; } -private: - void release_one() noexcept override { /* no-op */ } -}; - -class DynamicOperationThrottler final : public SharedOperationThrottler { - mutable std::mutex _mutex; - std::condition_variable _cond; - mbus::DynamicThrottlePolicy _throttle_policy; - uint32_t _pending_ops; - uint32_t _waiting_threads; -public: - explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment); - ~DynamicOperationThrottler() override; - - Token blocking_acquire_one() noexcept override; - Token blocking_acquire_one(vespalib::duration timeout) noexcept override; - Token try_acquire_one() noexcept override; - uint32_t current_window_size() const noexcept override; - uint32_t waiting_threads() const noexcept override; -private: - void release_one() noexcept override; - // Non-const since actually checking the send window of a dynamic throttler might change - // it if enough time has passed. - [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept; - void add_one_to_active_window_size(); - void subtract_one_from_active_window_size(); -}; - -DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment) - : _mutex(), - _cond(), - _throttle_policy(static_cast<double>(min_size_and_window_increment)), - _pending_ops(0), - _waiting_threads(0) -{ - _throttle_policy.setWindowSizeDecrementFactor(1.2); - _throttle_policy.setWindowSizeBackOff(0.95); -} - -DynamicOperationThrottler::~DynamicOperationThrottler() = default; - -bool -DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept -{ - DummyMbusRequest dummy_request; - return _throttle_policy.canSend(dummy_request, _pending_ops); -} - -void -DynamicOperationThrottler::add_one_to_active_window_size() -{ - DummyMbusRequest dummy_request; - _throttle_policy.processMessage(dummy_request); - ++_pending_ops; -} - -void -DynamicOperationThrottler::subtract_one_from_active_window_size() -{ - DummyMbusReply dummy_reply; - _throttle_policy.processReply(dummy_reply); - assert(_pending_ops > 0); - --_pending_ops; -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one() noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - ++_waiting_threads; - _cond.wait(lock, [&] { - return has_spare_capacity_in_active_window(); - }); - --_waiting_threads; - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - ++_waiting_threads; - const bool accepted = _cond.wait_for(lock, timeout, [&] { - return has_spare_capacity_in_active_window(); - }); - --_waiting_threads; - if (!accepted) { - return Token(); - } - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -DynamicOperationThrottler::Token -DynamicOperationThrottler::try_acquire_one() noexcept -{ - std::unique_lock lock(_mutex); - if (!has_spare_capacity_in_active_window()) { - return Token(); - } - add_one_to_active_window_size(); - return Token(this, TokenCtorTag{}); -} - -void -DynamicOperationThrottler::release_one() noexcept -{ - std::unique_lock lock(_mutex); - subtract_one_from_active_window_size(); - // Only wake up a waiting thread if doing so would possibly result in success. - if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) { - lock.unlock(); - _cond.notify_one(); - } -} - -uint32_t -DynamicOperationThrottler::current_window_size() const noexcept -{ - std::unique_lock lock(_mutex); - return _throttle_policy.getMaxPendingCount(); // Actually returns current window size -} - -uint32_t -DynamicOperationThrottler::waiting_threads() const noexcept -{ - std::unique_lock lock(_mutex); - return _waiting_threads; -} - -} - -std::unique_ptr<SharedOperationThrottler> -SharedOperationThrottler::make_unlimited_throttler() -{ - return std::make_unique<NoLimitsOperationThrottler>(); -} - -std::unique_ptr<SharedOperationThrottler> -SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment) -{ - return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment); -} - -DynamicOperationThrottler::Token::~Token() -{ - if (_throttler) { - _throttler->release_one(); - } -} - -void -DynamicOperationThrottler::Token::reset() noexcept -{ - if (_throttler) { - _throttler->release_one(); - _throttler = nullptr; - } -} - -DynamicOperationThrottler::Token& -DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept -{ - reset(); - _throttler = rhs._throttler; - rhs._throttler = nullptr; - return *this; -} - -} diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h index 4ee8d017c05..b829f077bcb 100644 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h @@ -1,72 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/vespalib/util/time.h> -#include <memory> -#include <optional> +#include <vespa/vespalib/util/shared_operation_throttler.h> namespace storage { -/** - * Operation throttler that is intended to provide global throttling of - * async operations across all persistence stripe threads. A throttler - * wraps a logical max pending window size of in-flight operations. Depending - * on the throttler implementation, the window size may expand and shrink - * dynamically. Exactly how and when this happens is unspecified. - * - * Offers both polling and (timed, non-timed) blocking calls for acquiring - * a throttle token. If the returned token is valid, the caller may proceed - * to invoke the asynchronous operation. - * - * The window slot taken up by a valid throttle token is implicitly freed up - * when the token is destroyed. - * - * All operations on the throttler are thread safe. - */ -class SharedOperationThrottler { -protected: - struct TokenCtorTag {}; // Make available to subclasses for token construction. -public: - class Token { - SharedOperationThrottler* _throttler; - public: - constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} - constexpr Token() noexcept : _throttler(nullptr) {} - constexpr Token(Token&& rhs) noexcept - : _throttler(rhs._throttler) - { - rhs._throttler = nullptr; - } - Token& operator=(Token&& rhs) noexcept; - ~Token(); - - Token(const Token&) = delete; - Token& operator=(const Token&) = delete; - - [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } - void reset() noexcept; - }; - - virtual ~SharedOperationThrottler() = default; - - // All methods are thread safe - [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; - [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; - [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; - - // May return 0, in which case the window size is unlimited. - [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; - - // Exposed for unit testing only. - [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; - - // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) - static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); - // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood - static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment); -private: - // Exclusively called from a valid Token. Thread safe. - virtual void release_one() noexcept = 0; -}; +using ThrottleToken = vespalib::SharedOperationThrottler::Token; } |