summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-03-01 13:15:29 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-03-01 13:45:39 +0000
commit85864e3528c32576d2d2bf70fb6732a0d465750d (patch)
tree5f8cf4ac8241370693134e2ee65a7f4f12c0d2f0
parentc8efa8ed2003e9032adad3b4b3acf912800f7247 (diff)
Make reconfiguring dynamic vs unlimited throttling not require restarting
Instead of having one abstract throttler created from bootstrap config, explicitly create one dynamic and one unlimited throttler and allow for atomically switching between the two based on received config. The `MergeHandler` component will now always fetch the current throttler from the `FileStorHandler` instead of caching it at construction time. This commit removes the `restart` annotation on the existing throttler type config enums.
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp30
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h18
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp30
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h2
7 files changed, 54 insertions, 37 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index e54b503ed93..531805d3039 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -80,7 +80,7 @@ resource_usage_reporter_noise_level double default=0.001
## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
## is full (if a blocking throttler API call is invoked).
##
-async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
## Internal throttler tuning parameters that only apply when type == DYNAMIC:
async_operation_throttler.window_size_increment int default=20
async_operation_throttler.window_size_decrement_factor double default=1.2
@@ -104,7 +104,7 @@ async_operation_throttler.throttle_individual_merge_feed_ops bool default=true
## is full (if a blocking throttler API call is invoked).
##
## TODO deprecate in favor of the async_operation_throttler struct instead.
-async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
## Specifies the extent the throttling window is increased by when the async throttle
## policy has decided that more concurrent operations are desirable. Also affects the
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 5c243ea4af9..cf0c3e68a69 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -276,6 +276,10 @@ public:
virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0;
+ virtual void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) = 0;
+
+ virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0;
+
virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index b5de5a233cc..de785c16cdf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,18 +40,20 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler())
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::DynamicThrottleParams())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg,
- std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler)
+ const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
- _operation_throttler(std::move(operation_throttler)),
+ _dynamic_operation_throttler(vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_throttle_params)),
+ _unlimited_operation_throttler(vespalib::SharedOperationThrottler::make_unlimited_throttler()),
+ _active_throttler(_unlimited_operation_throttler.get()), // Will be set by FileStorManager
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
@@ -251,6 +253,22 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a
return {};
}
+void
+FileStorHandlerImpl::reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params)
+{
+ _dynamic_operation_throttler->reconfigure_dynamic_throttling(params);
+}
+
+void
+FileStorHandlerImpl::use_dynamic_operation_throttling(bool use_dynamic) noexcept
+{
+ // Use release semantics instead of relaxed to ensure transitive visibility even in
+ // non-persistence threads that try to invoke the throttler (i.e. RPC threads).
+ _active_throttler.store(use_dynamic ? _dynamic_operation_throttler.get()
+ : _unlimited_operation_throttler.get(),
+ std::memory_order_release);
+}
+
bool
FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg)
{
@@ -333,9 +351,9 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
std::lock_guard lockGuard(_mergeStatesLock);
_metrics->pendingMerges.addValue(_mergeStates.size());
_metrics->queueSize.addValue(getQueueSize());
- _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size());
- _metrics->throttle_waiting_threads.addValue(_operation_throttler->waiting_threads());
- _metrics->throttle_active_tokens.addValue(_operation_throttler->current_active_token_count());
+ _metrics->throttle_window_size.addValue(operation_throttler().current_window_size());
+ _metrics->throttle_waiting_threads.addValue(operation_throttler().waiting_threads());
+ _metrics->throttle_active_tokens.addValue(operation_throttler().current_active_token_count());
for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 1bc0ab87b1c..9e245e9eddc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -192,7 +192,8 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>);
+ ServiceLayerComponentRegister&,
+ const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params);
~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
@@ -245,9 +246,18 @@ public:
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
vespalib::SharedOperationThrottler& operation_throttler() const noexcept override {
- return *_operation_throttler;
+ // It would be reasonable to assume that this could be a relaxed load since the set
+ // of possible throttlers is static and all _persistence_ thread creation is sequenced
+ // after throttler creation. But since the throttler may be invoked by RPC threads
+ // created in another context, use acquire semantics to ensure transitive visibility.
+ // TODO remove need for atomics once the throttler testing dust settles
+ return *_active_throttler.load(std::memory_order_acquire);
}
+ void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) override;
+
+ void use_dynamic_operation_throttling(bool use_dynamic) noexcept override;
+
void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept override {
// Relaxed is fine, worst case from temporarily observing a stale value is that
// an ApplyBucketDiff message is (or isn't) throttled at a high level.
@@ -264,7 +274,9 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
- std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _dynamic_operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _unlimited_operation_throttler;
+ std::atomic<vespalib::SharedOperationThrottler*> _active_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index f4aff96b53c..09bd842c308 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -164,20 +164,6 @@ dynamic_throttle_params_from_config(const StorFilestorConfig& config, uint32_t n
return params;
}
-std::unique_ptr<vespalib::SharedOperationThrottler>
-make_operation_throttler_from_config(const StorFilestorConfig& config, uint32_t num_threads)
-{
- // TODO only use struct config field instead once config model is updated
- const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
- (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
- if (use_dynamic_throttling) {
- auto dyn_params = dynamic_throttle_params_from_config(config, num_threads);
- return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params);
- } else {
- return vespalib::SharedOperationThrottler::make_unlimited_throttler();
- }
-}
-
#ifdef __PIC__
#define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec")))
#else
@@ -216,18 +202,17 @@ FileStorManager::getThreadLocalHandler() {
}
return *_G_threadLocalHandler;
}
-/**
- * If live configuration, assuming storageserver makes sure no messages are
- * incoming during reconfiguration
- */
+
void
FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
{
// If true, this is not the first configure.
- bool liveUpdate = ! _threads.empty();
+ const bool liveUpdate = ! _threads.empty();
_use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule;
_host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel);
+ const bool use_dynamic_throttling = ((config->asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
+ (config->asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps;
if (!liveUpdate) {
@@ -235,10 +220,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
uint32_t numThreads = std::max(1, _config->numThreads);
uint32_t numStripes = std::max(1u, numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
- auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads);
+ auto dyn_params = dynamic_throttle_params_from_config(*_config, numThreads);
_filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics,
- _compReg, std::move(operation_throttler));
+ _compReg, dyn_params);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(response_executor, CpuUsage::Category::WRITE),
numResponseThreads, 10000,
@@ -253,10 +238,11 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
} else {
assert(_filestorHandler);
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
- _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params);
+ _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
// TODO remove once desired dynamic throttling behavior is set in stone
{
+ _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
_filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops);
std::lock_guard guard(_lock);
for (auto& ph : _persistenceHandlers) {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 78f53de46b3..8287fe27509 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -32,7 +32,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
- _operation_throttler(_env._fileStorHandler.operation_throttler()),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
@@ -515,7 +514,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
- auto throttle_token = throttle_merge_feed_ops() ? _operation_throttler.blocking_acquire_one()
+ auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one()
: vespalib::SharedOperationThrottler::Token();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 93fb7efc8d0..1ed2fa878bc 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -24,7 +24,6 @@
namespace vespalib {
class ISequencedTaskExecutor;
-class SharedOperationThrottler;
}
namespace storage {
@@ -96,7 +95,6 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
- vespalib::SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;