aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-01-21 09:35:12 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-01-21 12:34:35 +0000
commit5ced8942ecd76da46fa2d6c70a19cc9d302a1110 (patch)
tree57f5a546b30f3b7ed2b6ccd1d0620885e728e289
parent690d90ceb13797e1ff7876d4e9fc24efbec5f57b (diff)
Replace storage operation throttler with vespalib implementation
Also move the remaining throttler unit tests to vespalib.
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/shared_operation_throttler_test.cpp116
-rw-r--r--storage/src/vespa/storage/persistence/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h8
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h8
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.cpp6
-rw-r--r--storage/src/vespa/storage/persistence/persistenceutil.h6
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.cpp201
-rw-r--r--storage/src/vespa/storage/persistence/shared_operation_throttler.h66
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp123
15 files changed, 148 insertions, 417 deletions
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index fb8120210c1..7b165e11b66 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -12,7 +12,6 @@ vespa_add_executable(storage_persistence_gtest_runner_app TEST
persistencethread_splittest.cpp
processalltest.cpp
provider_error_wrapper_test.cpp
- shared_operation_throttler_test.cpp
splitbitdetectortest.cpp
testandsettest.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/persistence/shared_operation_throttler_test.cpp b/storage/src/tests/persistence/shared_operation_throttler_test.cpp
deleted file mode 100644
index 0ad380937c7..00000000000
--- a/storage/src/tests/persistence/shared_operation_throttler_test.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/persistence/shared_operation_throttler.h>
-#include <vespa/vespalib/gtest/gtest.h>
-#include <vespa/vespalib/util/barrier.h>
-#include <chrono>
-#include <thread>
-
-using namespace ::testing;
-
-namespace storage {
-
-using ThrottleToken = SharedOperationThrottler::Token;
-
-TEST(SharedOperationThrottlerTest, unlimited_throttler_does_not_throttle) {
- // We technically can't test that the unlimited throttler _never_ throttles, but at
- // least check that it doesn't throttle _twice_, and then induce from this ;)
- auto throttler = SharedOperationThrottler::make_unlimited_throttler();
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->blocking_acquire_one();
- EXPECT_TRUE(token2.valid());
- // Window size should be zero (i.e. unlimited) for unlimited throttler
- EXPECT_EQ(throttler->current_window_size(), 0);
-}
-
-TEST(SharedOperationThrottlerTest, dynamic_throttler_respects_initial_window_size) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
- auto token2 = throttler->try_acquire_one();
- EXPECT_FALSE(token2.valid());
-
- EXPECT_EQ(throttler->current_window_size(), 1);
-}
-
-TEST(SharedOperationThrottlerTest, blocking_acquire_returns_immediately_if_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- token.reset();
- token = throttler->blocking_acquire_one(600s); // Should never block.
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, blocking_call_woken_up_if_throttle_slot_available) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- vespalib::Barrier barrier(2);
- std::thread t([&] {
- auto token = throttler->try_acquire_one();
- assert(token.valid());
- barrier.await();
- while (throttler->waiting_threads() != 1) {
- std::this_thread::sleep_for(100us);
- }
- // Implicit token release at thread scope exit
- });
- barrier.await();
- auto token = throttler->blocking_acquire_one();
- EXPECT_TRUE(token.valid());
- t.join();
-}
-
-TEST(SharedOperationThrottlerTest, time_bounded_blocking_acquire_waits_for_timeout) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto window_filling_token = throttler->try_acquire_one();
- auto before = std::chrono::steady_clock::now();
- // Will block for at least 1ms. Since no window slot will be available by that time,
- // an invalid token should be returned.
- auto token = throttler->blocking_acquire_one(1ms);
- auto after = std::chrono::steady_clock::now();
- EXPECT_TRUE((after - before) >= 1ms);
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, default_constructed_token_is_invalid) {
- ThrottleToken token;
- EXPECT_FALSE(token.valid());
- token.reset(); // no-op
- EXPECT_FALSE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_destruction_frees_up_throttle_window_slot) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- {
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
- }
- auto token = throttler->try_acquire_one();
- EXPECT_TRUE(token.valid());
-}
-
-TEST(SharedOperationThrottlerTest, token_can_be_moved_and_reset) {
- auto throttler = SharedOperationThrottler::make_dynamic_throttler(1);
- auto token1 = throttler->try_acquire_one();
- auto token2 = std::move(token1); // move ctor
- EXPECT_TRUE(token2.valid());
- EXPECT_FALSE(token1.valid());
- ThrottleToken token3;
- token3 = std::move(token2); // move assignment op
- EXPECT_TRUE(token3.valid());
- EXPECT_FALSE(token2.valid());
-
- // Trying to fetch new token should not succeed due to active token and win size of 1
- token1 = throttler->try_acquire_one();
- EXPECT_FALSE(token1.valid());
- // Resetting the token should free up the slot in the window
- token3.reset();
- token1 = throttler->try_acquire_one();
- EXPECT_TRUE(token1.valid());
-}
-
-// TODO ideally we'd test that the dynamic throttler has a window size that is actually
-// dynamic, but the backing DynamicThrottlePolicy implementation is a black box so
-// it's not trivial to know how to do this reliably.
-
-}
diff --git a/storage/src/vespa/storage/persistence/CMakeLists.txt b/storage/src/vespa/storage/persistence/CMakeLists.txt
index 5e068236026..c737d2bed28 100644
--- a/storage/src/vespa/storage/persistence/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/CMakeLists.txt
@@ -14,7 +14,6 @@ vespa_add_library(storage_spersistence OBJECT
persistenceutil.cpp
processallhandler.cpp
provider_error_wrapper.cpp
- shared_operation_throttler.cpp
simplemessagehandler.cpp
splitbitdetector.cpp
splitjoinhandler.cpp
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
index 73ccc7f6085..69d35253aa6 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.cpp
@@ -9,7 +9,7 @@ namespace storage {
ApplyBucketDiffEntryComplete::ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
document::DocumentId doc_id,
- SharedOperationThrottler::Token throttle_token,
+ ThrottleToken throttle_token,
const char *op,
const framework::Clock& clock,
metrics::DoubleAverageMetric& latency_metric)
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
index 8478cab4c17..a78fbe38ae5 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_entry_complete.h
@@ -22,14 +22,14 @@ class ApplyBucketDiffEntryComplete : public spi::OperationComplete
const spi::ResultHandler* _result_handler;
std::shared_ptr<ApplyBucketDiffState> _state;
document::DocumentId _doc_id;
- SharedOperationThrottler::Token _throttle_token;
+ ThrottleToken _throttle_token;
const char* _op;
framework::MilliSecTimer _start_time;
metrics::DoubleAverageMetric& _latency_metric;
public:
ApplyBucketDiffEntryComplete(std::shared_ptr<ApplyBucketDiffState> state,
document::DocumentId doc_id,
- SharedOperationThrottler::Token throttle_token,
+ ThrottleToken throttle_token,
const char *op, const framework::Clock& clock,
metrics::DoubleAverageMetric& latency_metric);
~ApplyBucketDiffEntryComplete() override;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 6f740ce2c28..66dc7126058 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -78,7 +78,7 @@ public:
struct LockedMessage {
std::shared_ptr<BucketLockInterface> lock;
std::shared_ptr<api::StorageMessage> msg;
- SharedOperationThrottler::Token throttle_token;
+ ThrottleToken throttle_token;
LockedMessage() noexcept = default;
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
@@ -89,7 +89,7 @@ public:
{}
LockedMessage(std::shared_ptr<BucketLockInterface> lock_,
std::shared_ptr<api::StorageMessage> msg_,
- SharedOperationThrottler::Token token) noexcept
+ ThrottleToken token) noexcept
: lock(std::move(lock_)),
msg(std::move(msg_)),
throttle_token(std::move(token))
@@ -274,7 +274,7 @@ public:
virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
- virtual SharedOperationThrottler& operation_throttler() const noexcept = 0;
+ virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 2ccbc7a85ef..f7d4f750884 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,14 +40,14 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg, SharedOperationThrottler::make_unlimited_throttler())
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg,
- std::unique_ptr<SharedOperationThrottler> operation_throttler)
+ std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
@@ -920,7 +920,7 @@ FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
{
std::unique_lock guard(*_lock);
- SharedOperationThrottler::Token throttle_token;
+ ThrottleToken throttle_token;
// Try to grab a message+lock, immediately retrying once after a wait
// if none can be found and then exiting if the same is the case on the
// second attempt. This is key to allowing the run loop to register
@@ -997,7 +997,7 @@ FileStorHandlerImpl::Stripe::get_next_async_message(monitor_guard& guard)
FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getMessage(monitor_guard & guard, PriorityIdx & idx, PriorityIdx::iterator iter,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
{
std::chrono::milliseconds waitTime(uint64_t(iter->_timer.stop(_metrics->averageQueueWaitingTime)));
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index c4b85ac596c..698f52359f5 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -149,7 +149,7 @@ public:
// with its locking requirements.
FileStorHandler::LockedMessage getMessage(monitor_guard & guard, PriorityIdx & idx,
PriorityIdx::iterator iter,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
using LockedBuckets = vespalib::hash_map<document::Bucket, MultiLockEntry, document::Bucket::hash>;
const FileStorHandlerImpl &_owner;
MessageSender &_messageSender;
@@ -191,7 +191,7 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&, std::unique_ptr<SharedOperationThrottler>);
+ ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>);
~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
@@ -243,7 +243,7 @@ public:
ResumeGuard pause() override;
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
- SharedOperationThrottler& operation_throttler() const noexcept override {
+ vespalib::SharedOperationThrottler& operation_throttler() const noexcept override {
return *_operation_throttler;
}
@@ -257,7 +257,7 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
- std::unique_ptr<SharedOperationThrottler> _operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 530d96dfe3f..03b16e75297 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -143,16 +143,19 @@ selectSequencer(StorFilestorConfig::ResponseSequencerType sequencerType) {
}
}
-std::unique_ptr<SharedOperationThrottler>
+std::unique_ptr<vespalib::SharedOperationThrottler>
make_operation_throttler_from_config(const StorFilestorConfig& config, size_t num_threads)
{
const bool use_dynamic_throttling = (config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC);
if (use_dynamic_throttling) {
auto config_win_size_incr = std::max(config.asyncOperationDynamicThrottlingWindowIncrement, 1);
auto win_size_increment = std::max(static_cast<size_t>(config_win_size_incr), num_threads);
- return SharedOperationThrottler::make_dynamic_throttler(win_size_increment);
+ vespalib::SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = win_size_increment;
+ params.min_window_size = win_size_increment;
+ return vespalib::SharedOperationThrottler::make_dynamic_throttler(params);
} else {
- return SharedOperationThrottler::make_unlimited_throttler();
+ return vespalib::SharedOperationThrottler::make_unlimited_throttler();
}
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index e1c821aab48..ee6eed63eb5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -22,7 +22,10 @@
#include <vespa/vespalib/util/monitored_refcount.h>
#include <atomic>
-namespace vespalib { class ISequencedTaskExecutor; }
+namespace vespalib {
+class ISequencedTaskExecutor;
+class SharedOperationThrottler;
+}
namespace storage {
@@ -34,7 +37,6 @@ namespace spi {
class PersistenceUtil;
class ApplyBucketDiffState;
class MergeStatus;
-class SharedOperationThrottler;
class MergeHandler : public Types,
public MergeBucketInfoSyncer {
@@ -85,7 +87,7 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
- SharedOperationThrottler& _operation_throttler;
+ vespalib::SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp
index 65eab99b8fb..2781cc61b83 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.cpp
+++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp
@@ -32,7 +32,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
: MessageTracker(timer, env, replySender, true, std::move(bucketLock), std::move(msg), std::move(throttle_token))
{}
MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
@@ -41,7 +41,7 @@ MessageTracker::MessageTracker(const framework::MilliSecTimer & timer,
bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock,
api::StorageMessage::SP msg,
- SharedOperationThrottler::Token throttle_token)
+ ThrottleToken throttle_token)
: _sendReply(true),
_updateBucketInfo(updateBucketInfo && hasBucketInfo(msg->getType().getId())),
_bucketLock(std::move(bucketLock)),
@@ -60,7 +60,7 @@ MessageTracker::createForTesting(const framework::MilliSecTimer & timer, Persist
FileStorHandler::BucketLockInterface::SP bucketLock, api::StorageMessage::SP msg)
{
return MessageTracker::UP(new MessageTracker(timer, env, replySender, false, std::move(bucketLock),
- std::move(msg), SharedOperationThrottler::Token()));
+ std::move(msg), ThrottleToken()));
}
void
diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h
index 588cbef2170..4130a276239 100644
--- a/storage/src/vespa/storage/persistence/persistenceutil.h
+++ b/storage/src/vespa/storage/persistence/persistenceutil.h
@@ -31,7 +31,7 @@ public:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
~MessageTracker();
@@ -93,7 +93,7 @@ public:
private:
MessageTracker(const framework::MilliSecTimer & timer, const PersistenceUtil & env, MessageSender & replySender, bool updateBucketInfo,
FileStorHandler::BucketLockInterface::SP bucketLock, std::shared_ptr<api::StorageMessage> msg,
- SharedOperationThrottler::Token throttle_token);
+ ThrottleToken throttle_token);
[[nodiscard]] bool count_result_as_failure() const noexcept;
@@ -101,7 +101,7 @@ private:
bool _updateBucketInfo;
FileStorHandler::BucketLockInterface::SP _bucketLock;
std::shared_ptr<api::StorageMessage> _msg;
- SharedOperationThrottler::Token _throttle_token;
+ ThrottleToken _throttle_token;
spi::Context _context;
const PersistenceUtil &_env;
MessageSender &_replySender;
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp b/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
deleted file mode 100644
index 7db1a0ccdbb..00000000000
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "shared_operation_throttler.h"
-#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/common/dummy_mbus_messages.h>
-#include <condition_variable>
-#include <cassert>
-#include <mutex>
-
-namespace storage {
-
-namespace {
-
-class NoLimitsOperationThrottler final : public SharedOperationThrottler {
-public:
- ~NoLimitsOperationThrottler() override = default;
- Token blocking_acquire_one() noexcept override {
- return Token(this, TokenCtorTag{});
- }
- Token blocking_acquire_one(vespalib::duration) noexcept override {
- return Token(this, TokenCtorTag{});
- }
- Token try_acquire_one() noexcept override {
- return Token(this, TokenCtorTag{});
- }
- uint32_t current_window_size() const noexcept override { return 0; }
- uint32_t waiting_threads() const noexcept override { return 0; }
-private:
- void release_one() noexcept override { /* no-op */ }
-};
-
-class DynamicOperationThrottler final : public SharedOperationThrottler {
- mutable std::mutex _mutex;
- std::condition_variable _cond;
- mbus::DynamicThrottlePolicy _throttle_policy;
- uint32_t _pending_ops;
- uint32_t _waiting_threads;
-public:
- explicit DynamicOperationThrottler(uint32_t min_size_and_window_increment);
- ~DynamicOperationThrottler() override;
-
- Token blocking_acquire_one() noexcept override;
- Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
- Token try_acquire_one() noexcept override;
- uint32_t current_window_size() const noexcept override;
- uint32_t waiting_threads() const noexcept override;
-private:
- void release_one() noexcept override;
- // Non-const since actually checking the send window of a dynamic throttler might change
- // it if enough time has passed.
- [[nodiscard]] bool has_spare_capacity_in_active_window() noexcept;
- void add_one_to_active_window_size();
- void subtract_one_from_active_window_size();
-};
-
-DynamicOperationThrottler::DynamicOperationThrottler(uint32_t min_size_and_window_increment)
- : _mutex(),
- _cond(),
- _throttle_policy(static_cast<double>(min_size_and_window_increment)),
- _pending_ops(0),
- _waiting_threads(0)
-{
- _throttle_policy.setWindowSizeDecrementFactor(1.2);
- _throttle_policy.setWindowSizeBackOff(0.95);
-}
-
-DynamicOperationThrottler::~DynamicOperationThrottler() = default;
-
-bool
-DynamicOperationThrottler::has_spare_capacity_in_active_window() noexcept
-{
- DummyMbusRequest dummy_request;
- return _throttle_policy.canSend(dummy_request, _pending_ops);
-}
-
-void
-DynamicOperationThrottler::add_one_to_active_window_size()
-{
- DummyMbusRequest dummy_request;
- _throttle_policy.processMessage(dummy_request);
- ++_pending_ops;
-}
-
-void
-DynamicOperationThrottler::subtract_one_from_active_window_size()
-{
- DummyMbusReply dummy_reply;
- _throttle_policy.processReply(dummy_reply);
- assert(_pending_ops > 0);
- --_pending_ops;
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::blocking_acquire_one() noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- ++_waiting_threads;
- _cond.wait(lock, [&] {
- return has_spare_capacity_in_active_window();
- });
- --_waiting_threads;
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- ++_waiting_threads;
- const bool accepted = _cond.wait_for(lock, timeout, [&] {
- return has_spare_capacity_in_active_window();
- });
- --_waiting_threads;
- if (!accepted) {
- return Token();
- }
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-DynamicOperationThrottler::Token
-DynamicOperationThrottler::try_acquire_one() noexcept
-{
- std::unique_lock lock(_mutex);
- if (!has_spare_capacity_in_active_window()) {
- return Token();
- }
- add_one_to_active_window_size();
- return Token(this, TokenCtorTag{});
-}
-
-void
-DynamicOperationThrottler::release_one() noexcept
-{
- std::unique_lock lock(_mutex);
- subtract_one_from_active_window_size();
- // Only wake up a waiting thread if doing so would possibly result in success.
- if ((_waiting_threads > 0) && has_spare_capacity_in_active_window()) {
- lock.unlock();
- _cond.notify_one();
- }
-}
-
-uint32_t
-DynamicOperationThrottler::current_window_size() const noexcept
-{
- std::unique_lock lock(_mutex);
- return _throttle_policy.getMaxPendingCount(); // Actually returns current window size
-}
-
-uint32_t
-DynamicOperationThrottler::waiting_threads() const noexcept
-{
- std::unique_lock lock(_mutex);
- return _waiting_threads;
-}
-
-}
-
-std::unique_ptr<SharedOperationThrottler>
-SharedOperationThrottler::make_unlimited_throttler()
-{
- return std::make_unique<NoLimitsOperationThrottler>();
-}
-
-std::unique_ptr<SharedOperationThrottler>
-SharedOperationThrottler::make_dynamic_throttler(uint32_t min_size_and_window_increment)
-{
- return std::make_unique<DynamicOperationThrottler>(min_size_and_window_increment);
-}
-
-DynamicOperationThrottler::Token::~Token()
-{
- if (_throttler) {
- _throttler->release_one();
- }
-}
-
-void
-DynamicOperationThrottler::Token::reset() noexcept
-{
- if (_throttler) {
- _throttler->release_one();
- _throttler = nullptr;
- }
-}
-
-DynamicOperationThrottler::Token&
-DynamicOperationThrottler::Token::operator=(Token&& rhs) noexcept
-{
- reset();
- _throttler = rhs._throttler;
- rhs._throttler = nullptr;
- return *this;
-}
-
-}
diff --git a/storage/src/vespa/storage/persistence/shared_operation_throttler.h b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
index 4ee8d017c05..b829f077bcb 100644
--- a/storage/src/vespa/storage/persistence/shared_operation_throttler.h
+++ b/storage/src/vespa/storage/persistence/shared_operation_throttler.h
@@ -1,72 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vespalib/util/time.h>
-#include <memory>
-#include <optional>
+#include <vespa/vespalib/util/shared_operation_throttler.h>
namespace storage {
-/**
- * Operation throttler that is intended to provide global throttling of
- * async operations across all persistence stripe threads. A throttler
- * wraps a logical max pending window size of in-flight operations. Depending
- * on the throttler implementation, the window size may expand and shrink
- * dynamically. Exactly how and when this happens is unspecified.
- *
- * Offers both polling and (timed, non-timed) blocking calls for acquiring
- * a throttle token. If the returned token is valid, the caller may proceed
- * to invoke the asynchronous operation.
- *
- * The window slot taken up by a valid throttle token is implicitly freed up
- * when the token is destroyed.
- *
- * All operations on the throttler are thread safe.
- */
-class SharedOperationThrottler {
-protected:
- struct TokenCtorTag {}; // Make available to subclasses for token construction.
-public:
- class Token {
- SharedOperationThrottler* _throttler;
- public:
- constexpr Token(SharedOperationThrottler* throttler, TokenCtorTag) noexcept : _throttler(throttler) {}
- constexpr Token() noexcept : _throttler(nullptr) {}
- constexpr Token(Token&& rhs) noexcept
- : _throttler(rhs._throttler)
- {
- rhs._throttler = nullptr;
- }
- Token& operator=(Token&& rhs) noexcept;
- ~Token();
-
- Token(const Token&) = delete;
- Token& operator=(const Token&) = delete;
-
- [[nodiscard]] constexpr bool valid() const noexcept { return (_throttler != nullptr); }
- void reset() noexcept;
- };
-
- virtual ~SharedOperationThrottler() = default;
-
- // All methods are thread safe
- [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
- [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
- [[nodiscard]] virtual Token try_acquire_one() noexcept = 0;
-
- // May return 0, in which case the window size is unlimited.
- [[nodiscard]] virtual uint32_t current_window_size() const noexcept = 0;
-
- // Exposed for unit testing only.
- [[nodiscard]] virtual uint32_t waiting_threads() const noexcept = 0;
-
- // Creates a throttler that does exactly zero throttling (but also has zero overhead and locking)
- static std::unique_ptr<SharedOperationThrottler> make_unlimited_throttler();
- // Creates a throttler that uses a MessageBus DynamicThrottlePolicy under the hood
- static std::unique_ptr<SharedOperationThrottler> make_dynamic_throttler(uint32_t min_size_and_window_increment);
-private:
- // Exclusively called from a valid Token. Thread safe.
- virtual void release_one() noexcept = 0;
-};
+using ThrottleToken = vespalib::SharedOperationThrottler::Token;
}
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
index 61cd2b5ef44..d9b6ae7f908 100644
--- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -1,20 +1,127 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/shared_operation_throttler.h>
#include <vespa/vespalib/testkit/test_kit.h>
+#include <vespa/vespalib/util/barrier.h>
+#include <thread>
namespace vespalib {
+using ThrottleToken = SharedOperationThrottler::Token;
+
+struct DynamicThrottleFixture {
+ std::unique_ptr<SharedOperationThrottler> _throttler;
+
+ DynamicThrottleFixture() {
+ SharedOperationThrottler::DynamicThrottleParams params;
+ params.window_size_increment = 1;
+ params.min_window_size = 1;
+ _throttler = SharedOperationThrottler::make_dynamic_throttler(params);
+ }
+};
+
+TEST("unlimited throttler does not throttle") {
+ // We technically can't test that the unlimited throttler _never_ throttles, but at
+ // least check that it doesn't throttle _twice_, and then induce from this ;)
+ auto throttler = SharedOperationThrottler::make_unlimited_throttler();
+ auto token1 = throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = throttler->blocking_acquire_one();
+ EXPECT_TRUE(token2.valid());
+ // Window size should be zero (i.e. unlimited) for unlimited throttler
+ EXPECT_EQUAL(throttler->current_window_size(), 0u);
+}
+
+TEST_F("dynamic throttler respects initial window size", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+ auto token2 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token2.valid());
+
+ EXPECT_EQUAL(f1._throttler->current_window_size(), 1u);
+}
+
+TEST_F("blocking acquire returns immediately if slot available", DynamicThrottleFixture()) {
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ token.reset();
+ token = f1._throttler->blocking_acquire_one(600s); // Should never block.
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixture()) {
+ vespalib::Barrier barrier(2);
+ std::thread t([&] {
+ auto token = f1._throttler->try_acquire_one();
+ assert(token.valid());
+ barrier.await();
+ while (f1._throttler->waiting_threads() != 1) {
+ std::this_thread::sleep_for(100us);
+ }
+ // Implicit token release at thread scope exit
+ });
+ barrier.await();
+ auto token = f1._throttler->blocking_acquire_one();
+ EXPECT_TRUE(token.valid());
+ t.join();
+}
+
+TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
+ auto window_filling_token = f1._throttler->try_acquire_one();
+ auto before = std::chrono::steady_clock::now();
+ // Will block for at least 1ms. Since no window slot will be available by that time,
+ // an invalid token should be returned.
+ auto token = f1._throttler->blocking_acquire_one(1ms);
+ auto after = std::chrono::steady_clock::now();
+ EXPECT_TRUE((after - before) >= 1ms);
+ EXPECT_FALSE(token.valid());
+}
+
+TEST("default constructed token is invalid") {
+ ThrottleToken token;
+ EXPECT_FALSE(token.valid());
+ token.reset(); // no-op
+ EXPECT_FALSE(token.valid());
+}
+
+TEST_F("token destruction frees up throttle window slot", DynamicThrottleFixture()) {
+ {
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+ }
+ auto token = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token.valid());
+}
+
+TEST_F("token can be moved and reset", DynamicThrottleFixture()) {
+ auto token1 = f1._throttler->try_acquire_one();
+ auto token2 = std::move(token1); // move ctor
+ EXPECT_TRUE(token2.valid());
+ EXPECT_FALSE(token1.valid());
+ ThrottleToken token3;
+ token3 = std::move(token2); // move assignment op
+ EXPECT_TRUE(token3.valid());
+ EXPECT_FALSE(token2.valid());
+
+ // Trying to fetch new token should not succeed due to active token and win size of 1
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_FALSE(token1.valid());
+ // Resetting the token should free up the slot in the window
+ token3.reset();
+ token1 = f1._throttler->try_acquire_one();
+ EXPECT_TRUE(token1.valid());
+}
+
// Note on test semantics: these tests are adapted from a subset of the MessageBus
// throttling tests. Some tests have been simplified due to no longer having access
// to the low-level DynamicThrottlePolicy API.
-struct Fixture {
+struct WindowFixture {
uint64_t _milli_time;
std::unique_ptr<SharedOperationThrottler> _throttler;
- Fixture(uint32_t window_size_increment = 5,
- uint32_t min_window_size = 20,
- uint32_t max_window_size = INT_MAX)
+ WindowFixture(uint32_t window_size_increment = 5,
+ uint32_t min_window_size = 20,
+ uint32_t max_window_size = INT_MAX)
: _milli_time(0),
_throttler()
{
@@ -57,7 +164,7 @@ struct Fixture {
}
};
-TEST_F("window size changes dynamically based on throughput", Fixture()) {
+TEST_F("window size changes dynamically based on throughput", WindowFixture()) {
uint32_t window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 105);
@@ -74,7 +181,7 @@ TEST_F("window size changes dynamically based on throughput", Fixture()) {
ASSERT_TRUE(window_size >= 90 && window_size <= 115);
}
-TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
+TEST_F("window size is reset after idle time period", WindowFixture(5, 1)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 90 && window_size <= 110);
@@ -88,12 +195,12 @@ TEST_F("window size is reset after idle time period", Fixture(5, 1)) {
EXPECT_EQUAL(tokens.size(), 1u); // Reduced to minimum window size
}
-TEST_F("minimum window size is respected", Fixture(5, 150, INT_MAX)) {
+TEST_F("minimum window size is respected", WindowFixture(5, 150, INT_MAX)) {
double window_size = f1.attempt_converge_on_stable_window_size(200);
ASSERT_TRUE(window_size >= 150 && window_size <= 210);
}
-TEST_F("maximum window size is respected", Fixture(5, 1, 50)) {
+TEST_F("maximum window size is respected", WindowFixture(5, 1, 50)) {
double window_size = f1.attempt_converge_on_stable_window_size(100);
ASSERT_TRUE(window_size >= 40 && window_size <= 50);
}