diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-02-22 14:35:29 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-02-22 14:38:46 +0000 |
commit | efd1c3398574b28cc6901f71e79376a0b8c0d65e (patch) | |
tree | 76a5214350d382cde65c01f8610fd80523a9286d /vespalib | |
parent | 81b13f7ac5825b519c389954c26b6642bfc4ed4d (diff) |
Expose current active operation throttler token count
Allows for inspecting _actual_ operation parallelism vs. the _maximum_
parallelism given by the internal policy window size. Remove the note
stating that waiting thread count is for testing only, as we want to be
able to expose that as well.
Available for both the dynamic and the unlimited implementations, as they
both explicitly track active token counts.
Diffstat (limited to 'vespalib')
3 files changed, 31 insertions, 6 deletions
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp index d9b6ae7f908..eefc0ca72c0 100644 --- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp +++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp @@ -29,6 +29,8 @@ TEST("unlimited throttler does not throttle") { EXPECT_TRUE(token2.valid()); // Window size should be zero (i.e. unlimited) for unlimited throttler EXPECT_EQUAL(throttler->current_window_size(), 0u); + // But we still track the active token count + EXPECT_EQUAL(throttler->current_active_token_count(), 2u); } TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) { @@ -38,6 +40,7 @@ TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture( EXPECT_FALSE(token2.valid()); EXPECT_EQUAL(f1._throttler->current_window_size(), 1u); + EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u); } TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) { @@ -87,9 +90,13 @@ TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture { auto token = f1._throttler->try_acquire_one(); EXPECT_TRUE(token.valid()); + EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u); } + EXPECT_EQUAL(f1._throttler->current_active_token_count(), 0u); + auto token = f1._throttler->try_acquire_one(); EXPECT_TRUE(token.valid()); + EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u); } TEST_F("token can be moved and reset", DynamicThrottleFixture()) { diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp index f5a1c117cc8..6e273d1a7ea 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -18,25 +18,34 @@ public: { } ~NoLimitsOperationThrottler() override { - assert(_refs.load(std::memory_order_acquire) == 0u); + assert(_refs.load() == 0u); } Token blocking_acquire_one() noexcept override { - ++_refs; + internal_ref_count_increase(); return Token(this, TokenCtorTag{}); } Token blocking_acquire_one(vespalib::duration) noexcept override { - ++_refs; + internal_ref_count_increase(); return Token(this, TokenCtorTag{}); } Token try_acquire_one() noexcept override { - ++_refs; + internal_ref_count_increase(); return Token(this, TokenCtorTag{}); } uint32_t current_window_size() const noexcept override { return 0; } + uint32_t current_active_token_count() const noexcept override { + return _refs.load(std::memory_order_relaxed); + } uint32_t waiting_threads() const noexcept override { return 0; } void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ } private: - void release_one() noexcept override { --_refs; } + void internal_ref_count_increase() noexcept { + // Relaxed semantics suffice, as there are no transitive memory visibility/ordering requirements. + _refs.fetch_add(1u, std::memory_order_relaxed); + } + void release_one() noexcept override { + _refs.fetch_sub(1u, std::memory_order_relaxed); + } std::atomic<uint32_t> _refs; }; @@ -261,6 +270,7 @@ public: Token blocking_acquire_one(vespalib::duration timeout) noexcept override; Token try_acquire_one() noexcept override; uint32_t current_window_size() const noexcept override; + uint32_t current_active_token_count() const noexcept override; uint32_t waiting_threads() const noexcept override; void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override; private: @@ -372,6 +382,13 @@ DynamicOperationThrottler::current_window_size() const noexcept } uint32_t +DynamicOperationThrottler::current_active_token_count() const noexcept +{ + std::unique_lock lock(_mutex); + return _pending_ops; +} + +uint32_t DynamicOperationThrottler::waiting_threads() const noexcept { std::unique_lock lock(_mutex); diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h index 030935339ed..b7913029c1e 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -65,7 +65,8 @@ public: // May return 0, in which case the window size is unlimited. [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0; - // Exposed for unit testing only. + [[nodiscard]] virtual uint32_t current_active_token_count() const noexcept = 0; + [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0; struct DynamicThrottleParams { |