summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-21 09:35:12 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-21 12:34:35 +0000
commit5ced8942ecd76da46fa2d6c70a19cc9d302a1110 (patch)
tree57f5a546b30f3b7ed2b6ccd1d0620885e728e289 /vespalib
parent690d90ceb13797e1ff7876d4e9fc24efbec5f57b (diff)
Replace storage operation throttler with vespalib implementation
Also move the remaining throttler unit tests to vespalib.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp123
1 files changed, 115 insertions, 8 deletions
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
index 61cd2b5ef44..d9b6ae7f908 100644
--- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -1,20 +1,127 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <thread>
namespace vespalib {
+using ThrottleToken = SharedOperationThrottler::Token;
+
+struct DynamicThrottleFixture {
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ DynamicThrottleFixture() {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = 1;
+ params.min_window_size = 1;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params);
+ }
+};
+
+TEST("unlimited throttler does not throttle") {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQUAL(throttler->current_window_size(), 0u);
+}
+
+TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
+}
+
+TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = f1._throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) {
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = f1._throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (f1._throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
+ auto window_filling_token = f1._throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = f1._throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST("default constructed token is invalid") {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) {
+ {
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
// Note on test semantics: these tests are adapted from a subset of the MessageBus
// throttling tests. Some tests have been simplified due to no longer having access
// to the low-level DynamicThrottlePolicy API.
-struct Fixture {
+struct WindowFixture {
uint64_t _milli_time;
std::unique_ptr<SharedOperationThrottler> _throttler;
- Fixture(uint32_t window_size_increment = 5,
- uint32_t min_window_size = 20,
- uint32_t max_window_size = INT_MAX)
+ WindowFixture(uint32_t window_size_increment = 5,
+ uint32_t min_window_size = 20,
+ uint32_t max_window_size = INT_MAX)
: _milli_time(0),
_throttler()
{
@@ -57,7 +164,7 @@ struct Fixture {
}
};
-TEST_F("window size changes dynamically based on throughput", Fixture()) {
+TEST_F("window size changes dynamically based on throughput", WindowFixture()) {
uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 105);
@@ -74,7 +181,7 @@ TEST_F("window size changes dynamically based on throughput", Fixture()) {
ASSERT_TRUE(window_size >= 90 && window_size <= 115);
}
-TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
+TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 110);
@@ -88,12 +195,12 @@ TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
}
-TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) {
+TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) {
double window_size = f1.attempt_converge_on_stable_window_size(200);
ASSERT_TRUE(window_size >= 150 && window_size <= 210);
}
-TEST_F("maximum window size is respected", Fixture(5, 1, 50)) {
+TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 40 && window_size <= 50);
}