aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-11 09:23:48 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-11 09:23:48 +0000
commit59fdfabc4b6f1e828043cf470ec55ff947d5ce46 (patch)
tree0fcb1fbc7eb7fd7ac8fde5f84ce4fba915f09a43 /storage
parent0d21c1830fff13fd19f7c6342ec2184979769185 (diff)
Clean up DynamicOperationThrottler to make it easier to read
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp56
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h3
2 files changed, 40 insertions, 19 deletions
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
index b72b1a8ba28..002c4a94387 100644
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
@@ -58,6 +58,11 @@ public:
uint32_t waiting_threads() const noexcept override;
private:
void release_one() noexcept override;
+ // Non-const since actually checking the send window of a dynamic throttler might change
+ // it if enough time has passed.
+ [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept;
+ void add_one_to_active_window_size();
+ void subtract_one_from_active_window_size();
};
DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
@@ -71,20 +76,42 @@ DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_windo
DynamicOperationThrottler::~DynamicOperationThrottler() = default;
+bool
+DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
+{
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ return _throttle_policy.canSend(dummy_msg, _pending_ops);
+}
+
+void
+DynamicOperationThrottler::add_one_to_active_window_size()
+{
+ DummyMbusMessage<mbus::Message> dummy_msg;
+ _throttle_policy.processMessage(dummy_msg);
+ ++_pending_ops;
+}
+
+void
+DynamicOperationThrottler::subtract_one_from_active_window_size()
+{
+ DummyMbusMessage<mbus::Reply> dummy_reply;
+ _throttle_policy.processReply(dummy_reply);
+ assert(_pending_ops > 0);
+ --_pending_ops;
+}
+
DynamicOperationThrottler::Token
DynamicOperationThrottler::blocking_acquire_one() noexcept
{
std::unique_lock lock(_mutex);
- DummyMbusMessage<mbus::Message> dummy_msg;
- if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ if (!has_spare_capacity_in_active_window()) {
++_waiting_threads;
_cond.wait(lock, [&] {
- return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ return has_spare_capacity_in_active_window();
});
--_waiting_threads;
}
- _throttle_policy.processMessage(dummy_msg);
- ++_pending_ops;
+ add_one_to_active_window_size();
return Token(this, TokenCtorTag{});
}
@@ -92,19 +119,17 @@ DynamicOperationThrottler::Token
DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
{
std::unique_lock lock(_mutex);
- DummyMbusMessage<mbus::Message> dummy_msg;
- if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ if (!has_spare_capacity_in_active_window()) {
++_waiting_threads;
const bool accepted = _cond.wait_for(lock, timeout, [&] {
- return _throttle_policy.canSend(dummy_msg, _pending_ops);
+ return has_spare_capacity_in_active_window();
});
--_waiting_threads;
if (!accepted) {
return Token();
}
}
- _throttle_policy.processMessage(dummy_msg);
- ++_pending_ops;
+ add_one_to_active_window_size();
return Token(this, TokenCtorTag{});
}
@@ -112,12 +137,10 @@ DynamicOperationThrottler::Token
DynamicOperationThrottler::try_acquire_one() noexcept
{
std::unique_lock lock(_mutex);
- DummyMbusMessage<mbus::Message> dummy_msg;
- if (!_throttle_policy.canSend(dummy_msg, _pending_ops)) {
+ if (!has_spare_capacity_in_active_window()) {
return Token();
}
- _throttle_policy.processMessage(dummy_msg);
- ++_pending_ops;
+ add_one_to_active_window_size();
return Token(this, TokenCtorTag{});
}
@@ -125,10 +148,7 @@ void
DynamicOperationThrottler::release_one() noexcept
{
std::unique_lock lock(_mutex);
- DummyMbusMessage<mbus::Reply> dummy_reply;
- _throttle_policy.processReply(dummy_reply);
- assert(_pending_ops > 0);
- --_pending_ops;
+ subtract_one_from_active_window_size();
if (_waiting_threads > 0) {
lock.unlock();
_cond.notify_one();
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
index 2e1de86c4b8..4ee8d017c05 100644
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -60,8 +60,9 @@ public:
// Exposed for unit testing only.
[[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
-
+ // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
private:
// Exclusively called from a valid Token. Thread safe.