summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-26 10:56:49 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-26 10:56:49 +0000
commit5c727a35f8c824f6d13d3702211a3554e031e1b6 (patch)
tree9fcb88e400cccf91f4bd4eefd06e9c203aadd2aa
parent1ecbb8ced4c2599155a2438625987b78e174bd5a (diff)
Support live reconfig of dynamic persistence throttling
Start moving internal `stor-server` throttling config to use struct config instead of separate fields, as this is more flexible and better matches how we configure throttling elsewhere. For now, let dynamic throttling be enabled via both the new and the old config enum. Config model will be updated to use the new config struct shortly.
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def17
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp30
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp51
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h22
4 files changed, 100 insertions, 20 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index 99f79a92c07..f1fc667ca9d 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -80,6 +80,22 @@ resource_usage_reporter_noise_level double default=0.001
## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
## is full (if a blocking throttler API call is invoked).
##
+async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+## Internal throttler tuning parameters that only apply when type == DYNAMIC:
+async_operation_throttler.window_size_increment int default=20
+async_operation_throttler.window_size_decrement_factor double default=1.2
+async_operation_throttler.window_size_backoff double default=0.95
+
+## Specify throttling used for async persistence operations. This throttling takes place
+## before operations are dispatched to Proton and serves as a limiter for how many
+## operations may be in flight in Proton's internal queues.
+##
+## - UNLIMITED is, as it says on the tin, unlimited. Offers no actual throttling, but
+## has near zero overhead and never blocks.
+## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
+## is full (if a blocking throttler API call is invoked).
+##
+## TODO deprecate in favor of the async_operation_throttler struct instead.
async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
## Specifies the extent the throttling window is increased by when the async throttle
@@ -88,4 +104,5 @@ async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED res
## value, number of threads).
##
## Only applies if async_operation_throttler_type == DYNAMIC.
+## DEPRECATED! use the async_operation_throttler struct instead
async_operation_dynamic_throttling_window_increment int default=20 restart
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 03b16e75297..ec7793cdd8d 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -143,17 +143,29 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
}
}
+vespalib::SharedOperationThrottler::DynamicThrottleParams
+dynamic_throttle_params_from_config(const StorFilestorConfig& config, size_t num_threads)
+{
+ const auto& cfg_params = config.asyncOperationThrottler;
+ auto win_size_incr = std::max(static_cast<size_t>(std::max(cfg_params.windowSizeIncrement, 1)), num_threads);
+
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = win_size_incr;
+ params.min_window_size = win_size_incr;
+ params.window_size_decrement_factor = cfg_params.windowSizeDecrementFactor;
+ params.window_size_backoff = cfg_params.windowSizeBackoff;
+ return params;
+}
+
std::unique_ptr<vespalib::SharedOperationThrottler>
make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
{
- const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
+ // TODO only use struct config field instead once config model is updated
+ const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
+ (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
if (use_dynamic_throttling) {
- auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
- auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
- vespalib::SharedOperationThrottler::DynamicThrottleParams params;
- params.window_size_increment = win_size_increment;
- params.min_window_size = win_size_increment;
- return vespalib::SharedOperationThrottler::make_dynamic_throttler(params);
+ auto dyn_params = dynamic_throttle_params_from_config(config, num_threads);
+ return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params);
} else {
return vespalib::SharedOperationThrottler::make_unlimited_throttler();
}
@@ -229,6 +241,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
*_filestorHandler, i % numStripes, _component));
}
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
+ } else {
+ assert(_filestorHandler);
+ auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
+ _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params);
}
}
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
index dd790bcaa0a..1df5f85bb6b 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -23,6 +23,7 @@ public:
}
uint32_t current_window_size() const noexcept override { return 0; }
uint32_t waiting_threads() const noexcept override { return 0; }
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ }
private:
void release_one() noexcept override { /* no-op */ }
};
@@ -38,6 +39,7 @@ private:
* messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
*/
class DynamicThrottlePolicy {
+ SharedOperationThrottler::DynamicThrottleParams _active_params;
std::function<steady_time()> _time_provider;
uint32_t _num_sent;
uint32_t _num_ok;
@@ -66,6 +68,8 @@ public:
void set_min_window_size(double min_size) noexcept;
void set_window_size_decrement_factor(double decrement_factor) noexcept;
+ void configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint32_t current_window_size() const noexcept {
return static_cast<uint32_t>(_window_size);
}
@@ -74,6 +78,8 @@ public:
void process_response(bool success) noexcept;
private:
+ void internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept;
+
[[nodiscard]] uint64_t current_time_as_millis() noexcept {
return count_ms(_time_provider().time_since_epoch());
}
@@ -81,7 +87,8 @@ private:
DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params,
std::function<steady_time()> time_provider)
- : _time_provider(std::move(time_provider)),
+ : _active_params(params),
+ _time_provider(std::move(time_provider)),
_num_sent(0),
_num_ok(0),
_resize_rate(3.0),
@@ -98,14 +105,7 @@ DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::Dyn
_weight(1),
_local_max_throughput(0)
{
- // We use setters for convenience, since setting one parameter may imply setting others,
- // based on it, and there's frequently min/max capping of values.
- set_window_size_increment(params.window_size_increment);
- set_min_window_size(params.min_window_size);
- set_max_window_size(params.max_window_size);
- set_resize_rate(params.resize_rate);
- set_window_size_decrement_factor(params.window_size_decrement_factor);
- set_window_size_backoff(params.window_size_backoff);
+ internal_unconditional_configure(_active_params);
}
void
@@ -146,6 +146,31 @@ DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor)
_decrement_factor = decrement_factor;
}
+void
+DynamicThrottlePolicy::internal_unconditional_configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // We use setters for convenience, since setting one parameter may imply setting others,
+ // based on it, and there's frequently min/max capping of values.
+ set_window_size_increment(params.window_size_increment);
+ set_min_window_size(params.min_window_size);
+ set_max_window_size(params.max_window_size);
+ set_resize_rate(params.resize_rate);
+ set_window_size_decrement_factor(params.window_size_decrement_factor);
+ set_window_size_backoff(params.window_size_backoff);
+}
+
+void
+DynamicThrottlePolicy::configure(const SharedOperationThrottler::DynamicThrottleParams& params) noexcept
+{
+ // To avoid any noise where setting parameters on the throttler may implicitly reduce the
+ // current window size (even though this isn't _currently_ the case), don't invoke any internal
+ // reconfiguration code unless the parameters have actually changed.
+ if (params != _active_params) {
+ internal_unconditional_configure(params);
+ _active_params = params;
+ }
+}
+
bool
DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept
{
@@ -225,6 +250,7 @@ public:
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
uint32_t waiting_threads() const noexcept override;
+ void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override;
private:
void release_one() noexcept override;
// Non-const since actually checking the send window of a dynamic throttler might change
@@ -340,6 +366,13 @@ DynamicOperationThrottler::waiting_threads() const noexcept
return _waiting_threads;
}
+void
+DynamicOperationThrottler::reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept
+{
+ std::unique_lock lock(_mutex);
+ _throttle_policy.configure(params);
+}
+
} // anonymous namespace
std::unique_ptr<SharedOperationThrottler>
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
index 2a8a42dd1ba..030935339ed 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -51,9 +51,15 @@ public:
virtual ~SharedOperationThrottler() = default;
- // All methods are thread safe
+ // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
[[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
+ // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
+ // available. If the timeout is exceeded without any tokens becoming available, an
+ // invalid token will be returned.
[[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ // Attempt to acquire a valid throttling token if one is immediately available.
+ // An invalid token will be returned if none is available. Never blocks (other than
+ // when contending for the internal throttler mutex).
[[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
// May return 0, in which case the window size is unlimited.
@@ -62,9 +68,6 @@ public:
// Exposed for unit testing only.
[[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
- // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
- static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
-
struct DynamicThrottleParams {
uint32_t window_size_increment = 20;
uint32_t min_window_size = 20;
@@ -72,8 +75,19 @@ public:
double resize_rate = 3.0;
double window_size_decrement_factor = 1.2;
double window_size_backoff = 0.95;
+
+ bool operator==(const DynamicThrottleParams&) const noexcept = default;
+ bool operator!=(const DynamicThrottleParams&) const noexcept = default;
};
+ // No-op if underlying throttler does not use a dynamic policy, or if the supplied
+ // parameters are equal to the current configuration.
+ // FIXME leaky abstraction alert!
+ virtual void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept = 0;
+
+ // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
+ static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
+
// Creates a throttler that uses a DynamicThrottlePolicy under the hood
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params);
static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params,