summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-26 10:56:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-26 10:56:49 +0000
commit5c727a35f8c824f6d13d3702211a3554e031e1b6 (patch)
tree9fcb88e400cccf91f4bd4eefd06e9c203aadd2aa /vespalib
parent1ecbb8ced4c2599155a2438625987b78e174bd5a (diff)
Support live reconfig of dynamic persistence throttling
Start moving internal `stor-server` throttling config to use struct config instead of separate fields, as this is more flexible and better matches how we configure throttling elsewhere. For now, let dynamic throttling be enabled via both the new and the old config enum. Config model will be updated to use the new config struct shortly.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp51
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h22
2 files changed, 60 insertions, 13 deletions
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
index dd790bcaa0a..1df5f85bb6b 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -23,6 +23,7 @@ public:
}
uint32_t current_window_size() const noexcept override { return 0; }
uint32_t waiting_threads() const noexcept override { return 0; }
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ }
private:
void release_one() noexcept override { /* no-op */ }
};
@@ -38,6 +39,7 @@ private:
* messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
*/
class DynamicThrottlePolicy {
+ SharedOperationThrottler::DynamicThrottleParams _active_params;
std::function<steady_time()> _time_provider;
uint32_t _num_sent;
uint32_t _num_ok;
@@ -66,6 +68,8 @@ public:
void set_min_window_size(double min_size) noexcept;
void set_window_size_decrement_factor(double decrement_factor) noexcept;
+ void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint32_t current_window_size() const noexcept {
return static_cast<uint32_t>(_window_size);
}
@@ -74,6 +78,8 @@ public:
void process_response(bool success) noexcept;
private:
+ void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint64_t current_time_as_millis() noexcept {
return count_ms(_time_provider().time_since_epoch());
}
@@ -81,7 +87,8 @@ private:
DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
std::function<steady_time()> time_provider)
- : _time_provider(std::move(time_provider)),
+ : _active_params(params),
+ _time_provider(std::move(time_provider)),
_num_sent(0),
_num_ok(0),
_resize_rate(3.0),
@@ -98,14 +105,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::Dyn
_weight(1),
_local_max_throughput(0)
{
- // We use setters for convenience, since setting one parameter may imply setting others,
- // based on it, and there's frequently min/max capping of values.
- set_window_size_increment(params.window_size_increment);
- set_min_window_size(params.min_window_size);
- set_max_window_size(params.max_window_size);
- set_resize_rate(params.resize_rate);
- set_window_size_decrement_factor(params.window_size_decrement_factor);
- set_window_size_backoff(params.window_size_backoff);
+ internal_unconditional_configure(_active_params);
}
void
@@ -146,6 +146,31 @@ DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor)
_decrement_factor = decrement_factor;
}
+void
+DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // We use setters for convenience, since setting one parameter may imply setting others,
+ // based on it, and there's frequently min/max capping of values.
+ set_window_size_increment(params.window_size_increment);
+ set_min_window_size(params.min_window_size);
+ set_max_window_size(params.max_window_size);
+ set_resize_rate(params.resize_rate);
+ set_window_size_decrement_factor(params.window_size_decrement_factor);
+ set_window_size_backoff(params.window_size_backoff);
+}
+
+void
+DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // To avoid any noise where setting parameters on the throttler may implicitly reduce the
+ // current window size (even though this isn't _currently_ the case), don't invoke any internal
+ // reconfiguration code unless the parameters have actually changed.
+ if (params != _active_params) {
+ internal_unconditional_configure(params);
+ _active_params = params;
+ }
+}
+
bool
DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept
{
@@ -225,6 +250,7 @@ public:
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
uint32_t waiting_threads() const noexcept override;
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override;
private:
void release_one() noexcept override;
// Non-const since actually checking the send window of a dynamic throttler might change
@@ -340,6 +366,13 @@ DynamicOperationThrottler::waiting_threads() const noexcept
return _waiting_threads;
}
+void
+DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept
+{
+ std::unique_lock lock(_mutex);
+ _throttle_policy.configure(params);
+}
+
} // anonymous namespace
std::unique_ptr<SharedOperationThrottler>
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
index 2a8a42dd1ba..030935339ed 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -51,9 +51,15 @@ public:
virtual ~SharedOperationThrottler() = default;
- // All methods are thread safe
+ // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
[[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
+ // available. If the timeout is exceeded without any tokens becoming available, an
+ // invalid token will be returned.
[[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ // Attempt to acquire a valid throttling token if one is immediately available.
+ // An invalid token will be returned if none is available. Never blocks (other than
+ // when contending for the internal throttler mutex).
[[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
// May return 0, in which case the window size is unlimited.
@@ -62,9 +68,6 @@ public:
// Exposed for unit testing only.
[[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
- // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
- static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
-
struct DynamicThrottleParams {
uint32_t window_size_increment = 20;
uint32_t min_window_size = 20;
@@ -72,8 +75,19 @@ public:
double resize_rate = 3.0;
double window_size_decrement_factor = 1.2;
double window_size_backoff = 0.95;
+
+ bool operator==(const DynamicThrottleParams&) const noexcept = default;
+ bool operator!=(const DynamicThrottleParams&) const noexcept = default;
};
+ // No-op if underlying throttler does not use a dynamic policy, or if the supplied
+ // parameters are equal to the current configuration.
+ // FIXME leaky abstraction alert!
+ virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0;
+
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
// Creates a throttler that uses a DynamicThrottlePolicy under the hood
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params);
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params,