summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-21 09:56:20 +0100
committerGitHub <noreply@github.com>2022-01-21 09:56:20 +0100
commit690d90ceb13797e1ff7876d4e9fc24efbec5f57b (patch)
tree86a30c8f03112334d2b3e82e8a417665cb417785 /vespalib
parentf912cc967ecf4812faa3c198b2805ff22195df8f (diff)
parent053a6f676c3700acbbdecc6856c12a4e8ab84a3b (diff)
Merge pull request #20888 from vespa-engine/vekterli/add-operation-throttler-to-vespalib
Add SharedOperationThrottler to vespalib utils
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/shared_operation_throttler/CMakeLists.txt8
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp105
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp386
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h86
6 files changed, 587 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index eb7e1f7d4c0..5e9426ba6ee 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -101,6 +101,7 @@ vespa_define_module(
src/tests/require
src/tests/runnable_pair
src/tests/sha1
+ src/tests/shared_operation_throttler
src/tests/shared_string_repo
src/tests/sharedptr
src/tests/signalhandler
diff --git a/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt
new file mode 100644
index 00000000000..6e977cdb59f
--- /dev/null
+++ b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt
@@ -0,0 +1,8 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_shared_operation_throttler_test_app TEST
+ SOURCES
+ shared_operation_throttler_test.cpp
+ DEPENDS
+ vespalib
+)
+vespa_add_test(NAME vespalib_shared_operation_throttler_test_app COMMAND vespalib_shared_operation_throttler_test_app)
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
new file mode 100644
index 00000000000..61cd2b5ef44
--- /dev/null
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -0,0 +1,105 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/vespalib/util/shared_operation_throttler.h>
+#include <vespa/vespalib/testkit/test_kit.h>
+
+namespace vespalib {
+
+// Note on test semantics: these tests are adapted from a subset of the MessageBus
+// throttling tests. Some tests have been simplified due to no longer having access
+// to the low-level DynamicThrottlePolicy API.
+
+struct Fixture {
+ uint64_t _milli_time;
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ Fixture(uint32_t window_size_increment = 5,
+ uint32_t min_window_size = 20,
+ uint32_t max_window_size = INT_MAX)
+ : _milli_time(0),
+ _throttler()
+ {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.resize_rate = 1;
+ params.window_size_increment = window_size_increment;
+ params.min_window_size = min_window_size;
+ params.max_window_size = max_window_size;
+ params.window_size_decrement_factor = 2;
+ params.window_size_backoff = 0.9;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params, [&]() noexcept {
+ return steady_time(std::chrono::milliseconds(_milli_time));
+ });
+ }
+
+ std::vector<SharedOperationThrottler::Token> fill_entire_throttle_window() {
+ std::vector<SharedOperationThrottler::Token> tokens;
+ while (true) {
+ auto token = _throttler->try_acquire_one();
+ if (!token.valid()) {
+ break;
+ }
+ tokens.emplace_back(std::move(token));
+ }
+ return tokens;
+ }
+
+ uint32_t attempt_converge_on_stable_window_size(uint32_t max_pending) {
+ for (uint32_t i = 0; i < 999; ++i) {
+ auto tokens = fill_entire_throttle_window();
+ uint32_t num_pending = static_cast<uint32_t>(tokens.size());
+
+ uint64_t trip_time = (num_pending < max_pending) ? 1000 : 1000 + (num_pending - max_pending) * 1000;
+ _milli_time += trip_time;
+ // Throttle window slots implicitly freed up as tokens are destructed.
+ }
+ uint32_t ret = _throttler->current_window_size();
+ fprintf(stderr, "attempt_converge_on_stable_window_size() = %u\n", ret);
+ return ret;
+ }
+};
+
+TEST_F("window size changes dynamically based on throughput", Fixture()) {
+ uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 105);
+
+ window_size = f1.attempt_converge_on_stable_window_size(200);
+ ASSERT_TRUE(window_size >= 180 && window_size <= 205);
+
+ window_size = f1.attempt_converge_on_stable_window_size(50);
+ ASSERT_TRUE(window_size >= 45 && window_size <= 55);
+
+ window_size = f1.attempt_converge_on_stable_window_size(500);
+ ASSERT_TRUE(window_size >= 450 && window_size <= 505);
+
+ window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 115);
+}
+
+TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 90 && window_size <= 110);
+
+ f1._milli_time += 30001; // Not yet past 60s idle time
+ auto tokens = f1.fill_entire_throttle_window();
+ ASSERT_TRUE(tokens.size() >= 90 && tokens.size() <= 110);
+ tokens.clear();
+
+ f1._milli_time += 60001; // Idle time passed
+ tokens = f1.fill_entire_throttle_window();
+ EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
+}
+
+TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(200);
+ ASSERT_TRUE(window_size >= 150 && window_size <= 210);
+}
+
+TEST_F("maximum window size is respected", Fixture(5, 1, 50)) {
+ double window_size = f1.attempt_converge_on_stable_window_size(100);
+ ASSERT_TRUE(window_size >= 40 && window_size <= 50);
+}
+
+}
+
+TEST_MAIN() {
+ TEST_RUN_ALL();
+}
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index 32f679d22d7..414a9d0f728 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -56,6 +56,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT
runnable_pair.cpp
sequence.cpp
sha1.cpp
+ shared_operation_throttler.cpp
shared_string_repo.cpp
sig_catch.cpp
signalhandler.cpp
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
new file mode 100644
index 00000000000..e91be68a671
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -0,0 +1,386 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "shared_operation_throttler.h"
+#include <condition_variable>
+#include <cassert>
+#include <functional>
+#include <mutex>
+
+namespace vespalib {
+
+namespace {
+
+class NoLimitsOperationThrottler final : public SharedOperationThrottler {
+public:
+ ~NoLimitsOperationThrottler() override = default;
+ Token blocking_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token blocking_acquire_one(vespalib::duration) noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ Token try_acquire_one() noexcept override {
+ return Token(this, TokenCtorTag{});
+ }
+ uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t waiting_threads() const noexcept override { return 0; }
+private:
+ void release_one() noexcept override { /* no-op */ }
+};
+
+/**
+ * Effectively a 1-1 transplant of the MessageBus DynamicThrottlePolicy, but
+ * without an underlying StaticThrottlePolicy and with no need for individual
+ * MessageBus Message/Reply objects.
+ *
+ * Please keep the underlying algorithm in sync with the Java implementation,
+ * as that is considered the source of truth. For descriptions of the various
+ * parameters, also see the Java code:
+ * messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+ */
+class DynamicThrottlePolicy {
+ std::function<steady_time()> _time_provider;
+ uint32_t _num_sent;
+ uint32_t _num_ok;
+ double _resize_rate;
+ uint64_t _resize_time;
+ uint64_t _time_of_last_message;
+ uint64_t _idle_time_period;
+ double _efficiency_threshold;
+ double _window_size_increment;
+ double _window_size;
+ double _max_window_size;
+ double _min_window_size;
+ double _decrement_factor;
+ double _window_size_backoff;
+ double _weight;
+ double _local_max_throughput;
+public:
+ DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+
+ void set_window_size_increment(double window_size_increment) noexcept;
+ void set_window_size_backoff(double window_size_backoff) noexcept;
+ void set_resize_rate(double resize_rate) noexcept;
+ void set_max_window_size(double max_size) noexcept;
+
+ void set_min_window_size(double min_size) noexcept;
+ void set_window_size_decrement_factor(double decrement_factor) noexcept;
+
+ [[nodiscard]] uint32_t current_window_size() const noexcept {
+ return static_cast<uint32_t>(_window_size);
+ }
+ [[nodiscard]] bool has_spare_capacity(uint32_t pending_count) noexcept;
+ void process_request() noexcept;
+ void process_response(bool success) noexcept;
+
+private:
+ [[nodiscard]] uint64_t current_time_as_millis() noexcept {
+ return count_ms(_time_provider().time_since_epoch());
+ }
+};
+
+DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+ : _time_provider(std::move(time_provider)),
+ _num_sent(0),
+ _num_ok(0),
+ _resize_rate(3.0),
+ _resize_time(0),
+ _time_of_last_message(current_time_as_millis()),
+ _idle_time_period(60000),
+ _efficiency_threshold(1),
+ _window_size_increment(20),
+ _window_size(_window_size_increment),
+ _max_window_size(INT_MAX),
+ _min_window_size(_window_size_increment),
+ _decrement_factor(2.0),
+ _window_size_backoff(0.9),
+ _weight(1),
+ _local_max_throughput(0)
+{
+ // We use setters for convenience, since setting one parameter may imply setting others,
+ // based on it, and there's frequently min/max capping of values.
+ set_window_size_increment(params.window_size_increment);
+ set_min_window_size(params.min_window_size);
+ set_max_window_size(params.max_window_size);
+ set_resize_rate(params.resize_rate);
+ set_window_size_decrement_factor(params.window_size_decrement_factor);
+ set_window_size_backoff(params.window_size_backoff);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_increment(double window_size_increment) noexcept
+{
+ _window_size_increment = window_size_increment;
+ _window_size = std::max(_window_size, _window_size_increment);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_backoff(double window_size_backoff) noexcept
+{
+ _window_size_backoff = std::max(0.0, std::min(1.0, window_size_backoff));
+}
+
+void
+DynamicThrottlePolicy::set_resize_rate(double resize_rate) noexcept
+{
+ _resize_rate = std::max(2.0, resize_rate);
+}
+
+void
+DynamicThrottlePolicy::set_max_window_size(double max_size) noexcept
+{
+ _max_window_size = max_size;
+}
+
+void
+DynamicThrottlePolicy::set_min_window_size(double min_size) noexcept
+{
+ _min_window_size = min_size;
+ _window_size = std::max(_min_window_size, _window_size_increment);
+}
+
+void
+DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor) noexcept
+{
+ _decrement_factor = decrement_factor;
+}
+
+bool
+DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept
+{
+ const uint64_t time = current_time_as_millis();
+ if ((time - _time_of_last_message) > _idle_time_period) {
+ _window_size = std::max(_min_window_size, std::min(_window_size, pending_count + _window_size_increment));
+ }
+ _time_of_last_message = time;
+ const auto window_size_floored = static_cast<uint32_t>(_window_size);
+ // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size.
+ const bool carry = _num_sent < ((_window_size * _resize_rate) * (_window_size - window_size_floored));
+ return pending_count < (window_size_floored + (carry ? 1 : 0));
+}
+
+void
+DynamicThrottlePolicy::process_request() noexcept
+{
+ if (++_num_sent < (_window_size * _resize_rate)) {
+ return;
+ }
+
+ const uint64_t time = current_time_as_millis();
+ const double elapsed = time - _resize_time;
+ _resize_time = time;
+
+ const double throughput = _num_ok / elapsed;
+ _num_sent = 0;
+ _num_ok = 0;
+
+ if (throughput > _local_max_throughput) {
+ _local_max_throughput = throughput;
+ _window_size += _weight * _window_size_increment;
+ } else {
+ // scale up/down throughput for comparing to window size
+ double period = 1;
+ while ((throughput * (period / _window_size)) < 2) {
+ period *= 10;
+ }
+ while ((throughput * (period / _window_size)) > 2) {
+ period *= 0.1;
+ }
+ const double efficiency = throughput * (period / _window_size);
+
+ if (efficiency < _efficiency_threshold) {
+ _window_size = std::min(_window_size * _window_size_backoff,
+ _window_size - _decrement_factor * _window_size_increment);
+ _local_max_throughput = 0;
+ } else {
+ _window_size += _weight * _window_size_increment;
+ }
+ }
+ _window_size = std::max(_min_window_size, _window_size);
+ _window_size = std::min(_max_window_size, _window_size);
+}
+
+void
+DynamicThrottlePolicy::process_response(bool success) noexcept
+{
+ if (success) {
+ ++_num_ok;
+ }
+}
+
+class DynamicOperationThrottler final : public SharedOperationThrottler {
+ mutable std::mutex _mutex;
+ std::condition_variable _cond;
+ DynamicThrottlePolicy _throttle_policy;
+ uint32_t _pending_ops;
+ uint32_t _waiting_threads;
+public:
+ DynamicOperationThrottler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+ ~DynamicOperationThrottler() override;
+
+ Token blocking_acquire_one() noexcept override;
+ Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token try_acquire_one() noexcept override;
+ uint32_t current_window_size() const noexcept override;
+ uint32_t waiting_threads() const noexcept override;
+private:
+ void release_one() noexcept override;
+ // Non-const since actually checking the send window of a dynamic throttler might change
+ // it if enough time has passed.
+ [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept;
+ void add_one_to_active_window_size() noexcept;
+ void subtract_one_from_active_window_size() noexcept;
+};
+
+DynamicOperationThrottler::DynamicOperationThrottler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+ : _mutex(),
+ _cond(),
+ _throttle_policy(params, std::move(time_provider)),
+ _pending_ops(0),
+ _waiting_threads(0)
+{
+}
+
+DynamicOperationThrottler::~DynamicOperationThrottler() = default;
+
+bool
+DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
+{
+ return _throttle_policy.has_spare_capacity(_pending_ops);
+}
+
+void
+DynamicOperationThrottler::add_one_to_active_window_size() noexcept
+{
+ _throttle_policy.process_request();
+ ++_pending_ops;
+}
+
+void
+DynamicOperationThrottler::subtract_one_from_active_window_size() noexcept
+{
+ _throttle_policy.process_response(true); // TODO support failure push-back
+ assert(_pending_ops > 0);
+ --_pending_ops;
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ ++_waiting_threads;
+ _cond.wait(lock, [&] {
+ return has_spare_capacity_in_active_window();
+ });
+ --_waiting_threads;
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ ++_waiting_threads;
+ const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ return has_spare_capacity_in_active_window();
+ });
+ --_waiting_threads;
+ if (!accepted) {
+ return Token();
+ }
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+DynamicOperationThrottler::Token
+DynamicOperationThrottler::try_acquire_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ if (!has_spare_capacity_in_active_window()) {
+ return Token();
+ }
+ add_one_to_active_window_size();
+ return Token(this, TokenCtorTag{});
+}
+
+void
+DynamicOperationThrottler::release_one() noexcept
+{
+ std::unique_lock lock(_mutex);
+ subtract_one_from_active_window_size();
+ // Only wake up a waiting thread if doing so would possibly result in success.
+ if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) {
+ lock.unlock();
+ _cond.notify_one();
+ }
+}
+
+uint32_t
+DynamicOperationThrottler::current_window_size() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _throttle_policy.current_window_size();
+}
+
+uint32_t
+DynamicOperationThrottler::waiting_threads() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _waiting_threads;
+}
+
+} // anonymous namespace
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_unlimited_throttler()
+{
+ return std::make_unique<NoLimitsOperationThrottler>();
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params)
+{
+ return std::make_unique<DynamicOperationThrottler>(params, []() noexcept { return steady_clock::now(); });
+}
+
+std::unique_ptr<SharedOperationThrottler>
+SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider)
+{
+ return std::make_unique<DynamicOperationThrottler>(params, std::move(time_provider));
+}
+
+DynamicOperationThrottler::Token::~Token()
+{
+ if (_throttler) {
+ _throttler->release_one();
+ }
+}
+
+void
+DynamicOperationThrottler::Token::reset() noexcept
+{
+ if (_throttler) {
+ _throttler->release_one();
+ _throttler = nullptr;
+ }
+}
+
+DynamicOperationThrottler::Token&
+DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
+{
+ reset();
+ _throttler = rhs._throttler;
+ rhs._throttler = nullptr;
+ return *this;
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
new file mode 100644
index 00000000000..2a8a42dd1ba
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -0,0 +1,86 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "time.h"
+#include <functional>
+#include <memory>
+#include <optional>
+#include <limits.h>
+
+namespace vespalib {
+
+/**
+ * Operation throttler that is intended to provide global throttling of
+ * async operations across multiple threads. A throttler wraps a logical
+ * max pending window size of in-flight operations. Depending on the
+ * throttler implementation, the window size may expand and shrink dynamically.
+ * Exactly how and when this happens is unspecified.
+ *
+ * Offers both polling and (timed, non-timed) blocking calls for acquiring
+ * a throttle token. If the returned token is valid, the caller may proceed
+ * to invoke the asynchronous operation.
+ *
+ * The window slot taken up by a valid throttle token is implicitly freed up
+ * when the token is destroyed.
+ *
+ * All operations on the throttler are thread safe.
+ */
+class SharedOperationThrottler {
+protected:
+ struct TokenCtorTag {}; // Make available to subclasses for token construction.
+public:
+ class Token {
+ SharedOperationThrottler* _throttler;
+ public:
+ constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
+ constexpr Token() noexcept : _throttler(nullptr) {}
+ constexpr Token(Token&& rhs) noexcept
+ : _throttler(rhs._throttler)
+ {
+ rhs._throttler = nullptr;
+ }
+ Token& operator=(Token&& rhs) noexcept;
+ ~Token();
+
+ Token(const Token&) = delete;
+ Token& operator=(const Token&) = delete;
+
+ [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
+ void reset() noexcept;
+ };
+
+ virtual ~SharedOperationThrottler() = default;
+
+ // All methods are thread safe
+ [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
+
+ // May return 0, in which case the window size is unlimited.
+ [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
+
+ // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
+
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
+ struct DynamicThrottleParams {
+ uint32_t window_size_increment = 20;
+ uint32_t min_window_size = 20;
+ uint32_t max_window_size = INT_MAX; // signed max to be 1-1 compatible with Java defaults
+ double resize_rate = 3.0;
+ double window_size_decrement_factor = 1.2;
+ double window_size_backoff = 0.95;
+ };
+
+ // Creates a throttler that uses a DynamicThrottlePolicy under the hood
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params);
+ static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params,
+ std::function<steady_time()> time_provider);
+private:
+ // Exclusively called from a valid Token. Thread safe.
+ virtual void release_one() noexcept = 0;
+};
+
+}