diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-11 09:23:48 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-11 09:23:48 +0000 |
commit | 59fdfabc4b6f1e828043cf470ec55ff947d5ce46 (patch) | |
tree | 0fcb1fbc7eb7fd7ac8fde5f84ce4fba915f09a43 /storage | |
parent | 0d21c1830fff13fd19f7c6342ec2184979769185 (diff) |
Clean up DynamicOperationThrottler to make it easier to read
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/persistence/shared_operation_throttler.cpp | 56 | ||||
-rw-r--r-- | storage/src/vespa/storage/persistence/shared_operation_throttler.h | 3 |
2 files changed, 40 insertions, 19 deletions
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp index b72b1a8ba28..002c4a94387 100644 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp @@ -58,6 +58,11 @@ public: uint32_t waiting_threads() const noexcept override; private: void release_one() noexcept override; + // Non-const since actually checking the send window of a dynamic throttler might change + // it if enough time has passed. + [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept; + void add_one_to_active_window_size(); + void subtract_one_from_active_window_size(); }; DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment) @@ -71,20 +76,42 @@ DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_windo DynamicOperationThrottler::~DynamicOperationThrottler() = default; +bool +DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept +{ + DummyMbusMessage<mbus::Message> dummy_msg; + return _throttle_policy.canSend(dummy_msg, _pending_ops); +} + +void +DynamicOperationThrottler::add_one_to_active_window_size() +{ + DummyMbusMessage<mbus::Message> dummy_msg; + _throttle_policy.processMessage(dummy_msg); + ++_pending_ops; +} + +void +DynamicOperationThrottler::subtract_one_from_active_window_size() +{ + DummyMbusMessage<mbus::Reply> dummy_reply; + _throttle_policy.processReply(dummy_reply); + assert(_pending_ops > 0); + --_pending_ops; +} + DynamicOperationThrottler::Token DynamicOperationThrottler::blocking_acquire_one() noexcept { std::unique_lock lock(_mutex); - DummyMbusMessage<mbus::Message> dummy_msg; - if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + if (!has_spare_capacity_in_active_window()) { ++_waiting_threads; _cond.wait(lock, [&] { - return _throttle_policy.canSend(dummy_msg, _pending_ops); + return has_spare_capacity_in_active_window(); }); --_waiting_threads; } - _throttle_policy.processMessage(dummy_msg); - ++_pending_ops; + add_one_to_active_window_size(); return Token(this, TokenCtorTag{}); } @@ -92,19 +119,17 @@ DynamicOperationThrottler::Token DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept { std::unique_lock lock(_mutex); - DummyMbusMessage<mbus::Message> dummy_msg; - if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + if (!has_spare_capacity_in_active_window()) { ++_waiting_threads; const bool accepted = _cond.wait_for(lock, timeout, [&] { - return _throttle_policy.canSend(dummy_msg, _pending_ops); + return has_spare_capacity_in_active_window(); }); --_waiting_threads; if (!accepted) { return Token(); } } - _throttle_policy.processMessage(dummy_msg); - ++_pending_ops; + add_one_to_active_window_size(); return Token(this, TokenCtorTag{}); } @@ -112,12 +137,10 @@ DynamicOperationThrottler::Token DynamicOperationThrottler::try_acquire_one() noexcept { std::unique_lock lock(_mutex); - DummyMbusMessage<mbus::Message> dummy_msg; - if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) { + if (!has_spare_capacity_in_active_window()) { return Token(); } - _throttle_policy.processMessage(dummy_msg); - ++_pending_ops; + add_one_to_active_window_size(); return Token(this, TokenCtorTag{}); } @@ -125,10 +148,7 @@ void DynamicOperationThrottler::release_one() noexcept { std::unique_lock lock(_mutex); - DummyMbusMessage<mbus::Reply> dummy_reply; - _throttle_policy.processReply(dummy_reply); - assert(_pending_ops > 0); - --_pending_ops; + subtract_one_from_active_window_size(); if (_waiting_threads > 0) { lock.unlock(); _cond.notify_one(); diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h index 2e1de86c4b8..4ee8d017c05 100644 --- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h +++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h @@ -60,8 +60,9 @@ public: // Exposed for unit testing only. [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; + // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); - + // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment); private: // Exclusively called from a valid Token. Thread safe. |