summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-02-22 14:35:29 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-02-22 14:38:46 +0000
commitefd1c3398574b28cc6901f71e79376a0b8c0d65e (patch)
tree76a5214350d382cde65c01f8610fd80523a9286d /vespalib
parent81b13f7ac5825b519c389954c26b6642bfc4ed4d (diff)
Expose current active operation throttler token count
Allows for inspecting _actual_ operation parallelism vs. the _maximum_ parallelism given by the internal policy window size. Remove the note stating that waiting thread count is for testing only, as we want to be able to expose that as well. Available for both the dynamic and the unlimited implementations, as they both explicitly track active token counts.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp27
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h3
3 files changed, 31 insertions, 6 deletions
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
index d9b6ae7f908..eefc0ca72c0 100644
--- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -29,6 +29,8 @@ TEST("unlimited throttler does not throttle") {
EXPECT_TRUE(token2.valid());
// Window size should be zero (i.e. unlimited) for unlimited throttler
EXPECT_EQUAL(throttler->current_window_size(), 0u);
+ // But we still track the active token count
+ EXPECT_EQUAL(throttler->current_active_token_count(), 2u);
}
TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
@@ -38,6 +40,7 @@ TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture(
EXPECT_FALSE(token2.valid());
EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
+ EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
}
TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
@@ -87,9 +90,13 @@ TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture
{
auto token = f1._throttler->try_acquire_one();
EXPECT_TRUE(token.valid());
+ EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
}
+ EXPECT_EQUAL(f1._throttler->current_active_token_count(), 0u);
+
auto token = f1._throttler->try_acquire_one();
EXPECT_TRUE(token.valid());
+ EXPECT_EQUAL(f1._throttler->current_active_token_count(), 1u);
}
TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
index f5a1c117cc8..6e273d1a7ea 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -18,25 +18,34 @@ public:
{
}
~NoLimitsOperationThrottler() override {
- assert(_refs.load(std::memory_order_acquire) == 0u);
+ assert(_refs.load() == 0u);
}
Token blocking_acquire_one() noexcept override {
- ++_refs;
+ internal_ref_count_increase();
return Token(this, TokenCtorTag{});
}
Token blocking_acquire_one(vespalib::duration) noexcept override {
- ++_refs;
+ internal_ref_count_increase();
return Token(this, TokenCtorTag{});
}
Token try_acquire_one() noexcept override {
- ++_refs;
+ internal_ref_count_increase();
return Token(this, TokenCtorTag{});
}
uint32_t current_window_size() const noexcept override { return 0; }
+ uint32_t current_active_token_count() const noexcept override {
+ return _refs.load(std::memory_order_relaxed);
+ }
uint32_t waiting_threads() const noexcept override { return 0; }
void reconfigure_dynamic_throttling(const DynamicThrottleParams&) noexcept override { /* no-op */ }
private:
- void release_one() noexcept override { --_refs; }
+ void internal_ref_count_increase() noexcept {
+ // Relaxed semantics suffice, as there are no transitive memory visibility/ordering requirements.
+ _refs.fetch_add(1u, std::memory_order_relaxed);
+ }
+ void release_one() noexcept override {
+ _refs.fetch_sub(1u, std::memory_order_relaxed);
+ }
std::atomic<uint32_t> _refs;
};
@@ -261,6 +270,7 @@ public:
Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
+ uint32_t current_active_token_count() const noexcept override;
uint32_t waiting_threads() const noexcept override;
void reconfigure_dynamic_throttling(const DynamicThrottleParams& params) noexcept override;
private:
@@ -372,6 +382,13 @@ DynamicOperationThrottler::current_window_size() const noexcept
}
uint32_t
+DynamicOperationThrottler::current_active_token_count() const noexcept
+{
+ std::unique_lock lock(_mutex);
+ return _pending_ops;
+}
+
+uint32_t
DynamicOperationThrottler::waiting_threads() const noexcept
{
std::unique_lock lock(_mutex);
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
index 030935339ed..b7913029c1e 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -65,7 +65,8 @@ public:
// May return 0, in which case the window size is unlimited.
[[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
- // Exposed for unit testing only.
+ [[nodiscard]] virtual uint32_t current_active_token_count() const noexcept = 0;
+
[[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
struct DynamicThrottleParams {