diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-01-21 09:56:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-21 09:56:20 +0100 |
commit | 690d90ceb13797e1ff7876d4e9fc24efbec5f57b (patch) | |
tree | 86a30c8f03112334d2b3e82e8a417665cb417785 /vespalib | |
parent | f912cc967ecf4812faa3c198b2805ff22195df8f (diff) | |
parent | 053a6f676c3700acbbdecc6856c12a4e8ab84a3b (diff) |
Merge pull request #20888 from vespa-engine/vekterli/add-operation-throttler-to-vespalib
Add SharedOperationThrottler to vespalib utils
Diffstat (limited to 'vespalib')
6 files changed, 587 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index eb7e1f7d4c0..5e9426ba6ee 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -101,6 +101,7 @@ vespa_define_module( src/tests/require src/tests/runnable_pair src/tests/sha1 + src/tests/shared_operation_throttler src/tests/shared_string_repo src/tests/sharedptr src/tests/signalhandler diff --git a/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt new file mode 100644 index 00000000000..6e977cdb59f --- /dev/null +++ b/vespalib/src/tests/shared_operation_throttler/CMakeLists.txt @@ -0,0 +1,8 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_shared_operation_throttler_test_app TEST + SOURCES + shared_operation_throttler_test.cpp + DEPENDS + vespalib +) +vespa_add_test(NAME vespalib_shared_operation_throttler_test_app COMMAND vespalib_shared_operation_throttler_test_app) diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp new file mode 100644 index 00000000000..61cd2b5ef44 --- /dev/null +++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp @@ -0,0 +1,105 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/vespalib/util/shared_operation_throttler.h> +#include <vespa/vespalib/testkit/test_kit.h> + +namespace vespalib { + +// Note on test semantics: these tests are adapted from a subset of the MessageBus +// throttling tests. Some tests have been simplified due to no longer having access +// to the low-level DynamicThrottlePolicy API. + +struct Fixture { + uint64_t _milli_time; + std::unique_ptr<SharedOperationThrottler> _throttler; + + Fixture(uint32_t window_size_increment = 5, + uint32_t min_window_size = 20, + uint32_t max_window_size = INT_MAX) + : _milli_time(0), + _throttler() + { + SharedOperationThrottler::DynamicThrottleParams params; + params.resize_rate = 1; + params.window_size_increment = window_size_increment; + params.min_window_size = min_window_size; + params.max_window_size = max_window_size; + params.window_size_decrement_factor = 2; + params.window_size_backoff = 0.9; + _throttler = SharedOperationThrottler::make_dynamic_throttler(params, [&]() noexcept { + return steady_time(std::chrono::milliseconds(_milli_time)); + }); + } + + std::vector<SharedOperationThrottler::Token> fill_entire_throttle_window() { + std::vector<SharedOperationThrottler::Token> tokens; + while (true) { + auto token = _throttler->try_acquire_one(); + if (!token.valid()) { + break; + } + tokens.emplace_back(std::move(token)); + } + return tokens; + } + + uint32_t attempt_converge_on_stable_window_size(uint32_t max_pending) { + for (uint32_t i = 0; i < 999; ++i) { + auto tokens = fill_entire_throttle_window(); + uint32_t num_pending = static_cast<uint32_t>(tokens.size()); + + uint64_t trip_time = (num_pending < max_pending) ? 1000 : 1000 + (num_pending - max_pending) * 1000; + _milli_time += trip_time; + // Throttle window slots implicitly freed up as tokens are destructed. + } + uint32_t ret = _throttler->current_window_size(); + fprintf(stderr, "attempt_converge_on_stable_window_size() = %u\n", ret); + return ret; + } +}; + +TEST_F("window size changes dynamically based on throughput", Fixture()) { + uint32_t window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 105); + + window_size = f1.attempt_converge_on_stable_window_size(200); + ASSERT_TRUE(window_size >= 180 && window_size <= 205); + + window_size = f1.attempt_converge_on_stable_window_size(50); + ASSERT_TRUE(window_size >= 45 && window_size <= 55); + + window_size = f1.attempt_converge_on_stable_window_size(500); + ASSERT_TRUE(window_size >= 450 && window_size <= 505); + + window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 115); +} + +TEST_F("window size is reset after idle time period", Fixture(5, 1)) { + double window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 90 && window_size <= 110); + + f1._milli_time += 30001; // Not yet past 60s idle time + auto tokens = f1.fill_entire_throttle_window(); + ASSERT_TRUE(tokens.size() >= 90 && tokens.size() <= 110); + tokens.clear(); + + f1._milli_time += 60001; // Idle time passed + tokens = f1.fill_entire_throttle_window(); + EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size +} + +TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) { + double window_size = f1.attempt_converge_on_stable_window_size(200); + ASSERT_TRUE(window_size >= 150 && window_size <= 210); +} + +TEST_F("maximum window size is respected", Fixture(5, 1, 50)) { + double window_size = f1.attempt_converge_on_stable_window_size(100); + ASSERT_TRUE(window_size >= 40 && window_size <= 50); +} + +} + +TEST_MAIN() { + TEST_RUN_ALL(); +} diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index 32f679d22d7..414a9d0f728 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -56,6 +56,7 @@ vespa_add_library(vespalib_vespalib_util OBJECT runnable_pair.cpp sequence.cpp sha1.cpp + shared_operation_throttler.cpp shared_string_repo.cpp sig_catch.cpp signalhandler.cpp diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp new file mode 100644 index 00000000000..e91be68a671 --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -0,0 +1,386 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "shared_operation_throttler.h" +#include <condition_variable> +#include <cassert> +#include <functional> +#include <mutex> + +namespace vespalib { + +namespace { + +class NoLimitsOperationThrottler final : public SharedOperationThrottler { +public: + ~NoLimitsOperationThrottler() override = default; + Token blocking_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + Token blocking_acquire_one(vespalib::duration) noexcept override { + return Token(this, TokenCtorTag{}); + } + Token try_acquire_one() noexcept override { + return Token(this, TokenCtorTag{}); + } + uint32_t current_window_size() const noexcept override { return 0; } + uint32_t waiting_threads() const noexcept override { return 0; } +private: + void release_one() noexcept override { /* no-op */ } +}; + +/** + * Effectively a 1-1 transplant of the MessageBus DynamicThrottlePolicy, but + * without an underlying StaticThrottlePolicy and with no need for individual + * MessageBus Message/Reply objects. + * + * Please keep the underlying algorithm in sync with the Java implementation, + * as that is considered the source of truth. For descriptions of the various + * parameters, also see the Java code: + * messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java + */ +class DynamicThrottlePolicy { + std::function<steady_time()> _time_provider; + uint32_t _num_sent; + uint32_t _num_ok; + double _resize_rate; + uint64_t _resize_time; + uint64_t _time_of_last_message; + uint64_t _idle_time_period; + double _efficiency_threshold; + double _window_size_increment; + double _window_size; + double _max_window_size; + double _min_window_size; + double _decrement_factor; + double _window_size_backoff; + double _weight; + double _local_max_throughput; +public: + DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params, + std::function<steady_time()> time_provider); + + void set_window_size_increment(double window_size_increment) noexcept; + void set_window_size_backoff(double window_size_backoff) noexcept; + void set_resize_rate(double resize_rate) noexcept; + void set_max_window_size(double max_size) noexcept; + + void set_min_window_size(double min_size) noexcept; + void set_window_size_decrement_factor(double decrement_factor) noexcept; + + [[nodiscard]] uint32_t current_window_size() const noexcept { + return static_cast<uint32_t>(_window_size); + } + [[nodiscard]] bool has_spare_capacity(uint32_t pending_count) noexcept; + void process_request() noexcept; + void process_response(bool success) noexcept; + +private: + [[nodiscard]] uint64_t current_time_as_millis() noexcept { + return count_ms(_time_provider().time_since_epoch()); + } +}; + +DynamicThrottlePolicy::DynamicThrottlePolicy(const SharedOperationThrottler::DynamicThrottleParams& params, + std::function<steady_time()> time_provider) + : _time_provider(std::move(time_provider)), + _num_sent(0), + _num_ok(0), + _resize_rate(3.0), + _resize_time(0), + _time_of_last_message(current_time_as_millis()), + _idle_time_period(60000), + _efficiency_threshold(1), + _window_size_increment(20), + _window_size(_window_size_increment), + _max_window_size(INT_MAX), + _min_window_size(_window_size_increment), + _decrement_factor(2.0), + _window_size_backoff(0.9), + _weight(1), + _local_max_throughput(0) +{ + // We use setters for convenience, since setting one parameter may imply setting others, + // based on it, and there's frequently min/max capping of values. + set_window_size_increment(params.window_size_increment); + set_min_window_size(params.min_window_size); + set_max_window_size(params.max_window_size); + set_resize_rate(params.resize_rate); + set_window_size_decrement_factor(params.window_size_decrement_factor); + set_window_size_backoff(params.window_size_backoff); +} + +void +DynamicThrottlePolicy::set_window_size_increment(double window_size_increment) noexcept +{ + _window_size_increment = window_size_increment; + _window_size = std::max(_window_size, _window_size_increment); +} + +void +DynamicThrottlePolicy::set_window_size_backoff(double window_size_backoff) noexcept +{ + _window_size_backoff = std::max(0.0, std::min(1.0, window_size_backoff)); +} + +void +DynamicThrottlePolicy::set_resize_rate(double resize_rate) noexcept +{ + _resize_rate = std::max(2.0, resize_rate); +} + +void +DynamicThrottlePolicy::set_max_window_size(double max_size) noexcept +{ + _max_window_size = max_size; +} + +void +DynamicThrottlePolicy::set_min_window_size(double min_size) noexcept +{ + _min_window_size = min_size; + _window_size = std::max(_min_window_size, _window_size_increment); +} + +void +DynamicThrottlePolicy::set_window_size_decrement_factor(double decrement_factor) noexcept +{ + _decrement_factor = decrement_factor; +} + +bool +DynamicThrottlePolicy::has_spare_capacity(uint32_t pending_count) noexcept +{ + const uint64_t time = current_time_as_millis(); + if ((time - _time_of_last_message) > _idle_time_period) { + _window_size = std::max(_min_window_size, std::min(_window_size, pending_count + _window_size_increment)); + } + _time_of_last_message = time; + const auto window_size_floored = static_cast<uint32_t>(_window_size); + // Use floating point window sizes, so the algorithm sees the difference between 1.1 and 1.9 window size. + const bool carry = _num_sent < ((_window_size * _resize_rate) * (_window_size - window_size_floored)); + return pending_count < (window_size_floored + (carry ? 1 : 0)); +} + +void +DynamicThrottlePolicy::process_request() noexcept +{ + if (++_num_sent < (_window_size * _resize_rate)) { + return; + } + + const uint64_t time = current_time_as_millis(); + const double elapsed = time - _resize_time; + _resize_time = time; + + const double throughput = _num_ok / elapsed; + _num_sent = 0; + _num_ok = 0; + + if (throughput > _local_max_throughput) { + _local_max_throughput = throughput; + _window_size += _weight * _window_size_increment; + } else { + // scale up/down throughput for comparing to window size + double period = 1; + while ((throughput * (period / _window_size)) < 2) { + period *= 10; + } + while ((throughput * (period / _window_size)) > 2) { + period *= 0.1; + } + const double efficiency = throughput * (period / _window_size); + + if (efficiency < _efficiency_threshold) { + _window_size = std::min(_window_size * _window_size_backoff, + _window_size - _decrement_factor * _window_size_increment); + _local_max_throughput = 0; + } else { + _window_size += _weight * _window_size_increment; + } + } + _window_size = std::max(_min_window_size, _window_size); + _window_size = std::min(_max_window_size, _window_size); +} + +void +DynamicThrottlePolicy::process_response(bool success) noexcept +{ + if (success) { + ++_num_ok; + } +} + +class DynamicOperationThrottler final : public SharedOperationThrottler { + mutable std::mutex _mutex; + std::condition_variable _cond; + DynamicThrottlePolicy _throttle_policy; + uint32_t _pending_ops; + uint32_t _waiting_threads; +public: + DynamicOperationThrottler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider); + ~DynamicOperationThrottler() override; + + Token blocking_acquire_one() noexcept override; + Token blocking_acquire_one(vespalib::duration timeout) noexcept override; + Token try_acquire_one() noexcept override; + uint32_t current_window_size() const noexcept override; + uint32_t waiting_threads() const noexcept override; +private: + void release_one() noexcept override; + // Non-const since actually checking the send window of a dynamic throttler might change + // it if enough time has passed. + [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept; + void add_one_to_active_window_size() noexcept; + void subtract_one_from_active_window_size() noexcept; +}; + +DynamicOperationThrottler::DynamicOperationThrottler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider) + : _mutex(), + _cond(), + _throttle_policy(params, std::move(time_provider)), + _pending_ops(0), + _waiting_threads(0) +{ +} + +DynamicOperationThrottler::~DynamicOperationThrottler() = default; + +bool +DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept +{ + return _throttle_policy.has_spare_capacity(_pending_ops); +} + +void +DynamicOperationThrottler::add_one_to_active_window_size() noexcept +{ + _throttle_policy.process_request(); + ++_pending_ops; +} + +void +DynamicOperationThrottler::subtract_one_from_active_window_size() noexcept +{ + _throttle_policy.process_response(true); // TODO support failure push-back + assert(_pending_ops > 0); + --_pending_ops; +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + ++_waiting_threads; + _cond.wait(lock, [&] { + return has_spare_capacity_in_active_window(); + }); + --_waiting_threads; + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + ++_waiting_threads; + const bool accepted = _cond.wait_for(lock, timeout, [&] { + return has_spare_capacity_in_active_window(); + }); + --_waiting_threads; + if (!accepted) { + return Token(); + } + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +DynamicOperationThrottler::Token +DynamicOperationThrottler::try_acquire_one() noexcept +{ + std::unique_lock lock(_mutex); + if (!has_spare_capacity_in_active_window()) { + return Token(); + } + add_one_to_active_window_size(); + return Token(this, TokenCtorTag{}); +} + +void +DynamicOperationThrottler::release_one() noexcept +{ + std::unique_lock lock(_mutex); + subtract_one_from_active_window_size(); + // Only wake up a waiting thread if doing so would possibly result in success. + if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) { + lock.unlock(); + _cond.notify_one(); + } +} + +uint32_t +DynamicOperationThrottler::current_window_size() const noexcept +{ + std::unique_lock lock(_mutex); + return _throttle_policy.current_window_size(); +} + +uint32_t +DynamicOperationThrottler::waiting_threads() const noexcept +{ + std::unique_lock lock(_mutex); + return _waiting_threads; +} + +} // anonymous namespace + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_unlimited_throttler() +{ + return std::make_unique<NoLimitsOperationThrottler>(); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params) +{ + return std::make_unique<DynamicOperationThrottler>(params, []() noexcept { return steady_clock::now(); }); +} + +std::unique_ptr<SharedOperationThrottler> +SharedOperationThrottler::make_dynamic_throttler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider) +{ + return std::make_unique<DynamicOperationThrottler>(params, std::move(time_provider)); +} + +DynamicOperationThrottler::Token::~Token() +{ + if (_throttler) { + _throttler->release_one(); + } +} + +void +DynamicOperationThrottler::Token::reset() noexcept +{ + if (_throttler) { + _throttler->release_one(); + _throttler = nullptr; + } +} + +DynamicOperationThrottler::Token& +DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept +{ + reset(); + _throttler = rhs._throttler; + rhs._throttler = nullptr; + return *this; +} + +} diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h new file mode 100644 index 00000000000..2a8a42dd1ba --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -0,0 +1,86 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "time.h" +#include <functional> +#include <memory> +#include <optional> +#include <limits.h> + +namespace vespalib { + +/** + * Operation throttler that is intended to provide global throttling of + * async operations across multiple threads. A throttler wraps a logical + * max pending window size of in-flight operations. Depending on the + * throttler implementation, the window size may expand and shrink dynamically. + * Exactly how and when this happens is unspecified. + * + * Offers both polling and (timed, non-timed) blocking calls for acquiring + * a throttle token. If the returned token is valid, the caller may proceed + * to invoke the asynchronous operation. + * + * The window slot taken up by a valid throttle token is implicitly freed up + * when the token is destroyed. + * + * All operations on the throttler are thread safe. + */ +class SharedOperationThrottler { +protected: + struct TokenCtorTag {}; // Make available to subclasses for token construction. +public: + class Token { + SharedOperationThrottler* _throttler; + public: + constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {} + constexpr Token() noexcept : _throttler(nullptr) {} + constexpr Token(Token&& rhs) noexcept + : _throttler(rhs._throttler) + { + rhs._throttler = nullptr; + } + Token& operator=(Token&& rhs) noexcept; + ~Token(); + + Token(const Token&) = delete; + Token& operator=(const Token&) = delete; + + [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); } + void reset() noexcept; + }; + + virtual ~SharedOperationThrottler() = default; + + // All methods are thread safe + [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + [[nodiscard]] virtual Token try_acquire_one() noexcept = 0; + + // May return 0, in which case the window size is unlimited. + [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; + + // Exposed for unit testing only. + [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; + + // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking) + static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler(); + + struct DynamicThrottleParams { + uint32_t window_size_increment = 20; + uint32_t min_window_size = 20; + uint32_t max_window_size = INT_MAX; // signed max to be 1-1 compatible with Java defaults + double resize_rate = 3.0; + double window_size_decrement_factor = 1.2; + double window_size_backoff = 0.95; + }; + + // Creates a throttler that uses a DynamicThrottlePolicy under the hood + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params); + static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(const DynamicThrottleParams& params, + std::function<steady_time()> time_provider); +private: + // Exclusively called from a valid Token. Thread safe. + virtual void release_one() noexcept = 0; +}; + +} |