summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configdefinitions/src/vespa/stor-filestor.def4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp30
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h18
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp30
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h2
7 files changed, 54 insertions, 37 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def
index e54b503ed93..531805d3039 100644
--- a/configdefinitions/src/vespa/stor-filestor.def
+++ b/configdefinitions/src/vespa/stor-filestor.def
@@ -80,7 +80,7 @@ resource_usage_reporter_noise_level double default=0.001
## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window
## is full (if a blocking throttler API call is invoked).
##
-async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
## Internal throttler tuning parameters that only apply when type == DYNAMIC:
async_operation_throttler.window_size_increment int default=20
async_operation_throttler.window_size_decrement_factor double default=1.2
@@ -104,7 +104,7 @@ async_operation_throttler.throttle_individual_merge_feed_ops bool default=true
## is full (if a blocking throttler API call is invoked).
##
## TODO deprecate in favor of the async_operation_throttler struct instead.
-async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart
+async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED
## Specifies the extent the throttling window is increased by when the async throttle
## policy has decided that more concurrent operations are desirable. Also affects the
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 5c243ea4af9..cf0c3e68a69 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -276,6 +276,10 @@ public:
virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0;
+ virtual void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) = 0;
+
+ virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0;
+
virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index b5de5a233cc..de785c16cdf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -40,18 +40,20 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
- : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler())
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::DynamicThrottleParams())
{
}
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg,
- std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler)
+ const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params)
: _component(compReg, "filestorhandlerimpl"),
_state(FileStorHandler::AVAILABLE),
_metrics(nullptr),
- _operation_throttler(std::move(operation_throttler)),
+ _dynamic_operation_throttler(vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_throttle_params)),
+ _unlimited_operation_throttler(vespalib::SharedOperationThrottler::make_unlimited_throttler()),
+ _active_throttler(_unlimited_operation_throttler.get()), // Will be set by FileStorManager
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
@@ -251,6 +253,22 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a
return {};
}
+void
+FileStorHandlerImpl::reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params)
+{
+ _dynamic_operation_throttler->reconfigure_dynamic_throttling(params);
+}
+
+void
+FileStorHandlerImpl::use_dynamic_operation_throttling(bool use_dynamic) noexcept
+{
+ // Use release semantics instead of relaxed to ensure transitive visibility even in
+ // non-persistence threads that try to invoke the throttler (i.e. RPC threads).
+ _active_throttler.store(use_dynamic ? _dynamic_operation_throttler.get()
+ : _unlimited_operation_throttler.get(),
+ std::memory_order_release);
+}
+
bool
FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg)
{
@@ -333,9 +351,9 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
std::lock_guard lockGuard(_mergeStatesLock);
_metrics->pendingMerges.addValue(_mergeStates.size());
_metrics->queueSize.addValue(getQueueSize());
- _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size());
- _metrics->throttle_waiting_threads.addValue(_operation_throttler->waiting_threads());
- _metrics->throttle_active_tokens.addValue(_operation_throttler->current_active_token_count());
+ _metrics->throttle_window_size.addValue(operation_throttler().current_window_size());
+ _metrics->throttle_waiting_threads.addValue(operation_throttler().waiting_threads());
+ _metrics->throttle_active_tokens.addValue(operation_throttler().current_active_token_count());
for (const auto & stripe : _metrics->stripes) {
const auto & m = stripe->averageQueueWaitingTime;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 1bc0ab87b1c..9e245e9eddc 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -192,7 +192,8 @@ public:
FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>);
+ ServiceLayerComponentRegister&,
+ const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params);
~FileStorHandlerImpl() override;
void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
@@ -245,9 +246,18 @@ public:
void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
vespalib::SharedOperationThrottler& operation_throttler() const noexcept override {
- return *_operation_throttler;
+ // It would be reasonable to assume that this could be a relaxed load since the set
+ // of possible throttlers is static and all _persistence_ thread creation is sequenced
+ // after throttler creation. But since the throttler may be invoked by RPC threads
+ // created in another context, use acquire semantics to ensure transitive visibility.
+ // TODO remove need for atomics once the throttler testing dust settles
+ return *_active_throttler.load(std::memory_order_acquire);
}
+ void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) override;
+
+ void use_dynamic_operation_throttling(bool use_dynamic) noexcept override;
+
void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept override {
// Relaxed is fine, worst case from temporarily observing a stale value is that
// an ApplyBucketDiff message is (or isn't) throttled at a high level.
@@ -264,7 +274,9 @@ private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
FileStorDiskMetrics * _metrics;
- std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _dynamic_operation_throttler;
+ std::unique_ptr<vespalib::SharedOperationThrottler> _unlimited_operation_throttler;
+ std::atomic<vespalib::SharedOperationThrottler*> _active_throttler;
std::vector<Stripe> _stripes;
MessageSender& _messageSender;
const document::BucketIdFactory& _bucketIdFactory;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index f4aff96b53c..09bd842c308 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -164,20 +164,6 @@ dynamic_throttle_params_from_config(const StorFilestorConfig& config, uint32_t n
return params;
}
-std::unique_ptr<vespalib::SharedOperationThrottler>
-make_operation_throttler_from_config(const StorFilestorConfig& config, uint32_t num_threads)
-{
- // TODO only use struct config field instead once config model is updated
- const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
- (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
- if (use_dynamic_throttling) {
- auto dyn_params = dynamic_throttle_params_from_config(config, num_threads);
- return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params);
- } else {
- return vespalib::SharedOperationThrottler::make_unlimited_throttler();
- }
-}
-
#ifdef __PIC__
#define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec")))
#else
@@ -216,18 +202,17 @@ FileStorManager::getThreadLocalHandler() {
}
return *_G_threadLocalHandler;
}
-/**
- * If live configuration, assuming storageserver makes sure no messages are
- * incoming during reconfiguration
- */
+
void
FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
{
// If true, this is not the first configure.
- bool liveUpdate = ! _threads.empty();
+ const bool liveUpdate = ! _threads.empty();
_use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule;
_host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel);
+ const bool use_dynamic_throttling = ((config->asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
+ (config->asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps;
if (!liveUpdate) {
@@ -235,10 +220,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
uint32_t numThreads = std::max(1, _config->numThreads);
uint32_t numStripes = std::max(1u, numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
- auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads);
+ auto dyn_params = dynamic_throttle_params_from_config(*_config, numThreads);
_filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics,
- _compReg, std::move(operation_throttler));
+ _compReg, dyn_params);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(response_executor, CpuUsage::Category::WRITE),
numResponseThreads, 10000,
@@ -253,10 +238,11 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
} else {
assert(_filestorHandler);
auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
- _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params);
+ _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
// TODO remove once desired dynamic throttling behavior is set in stone
{
+ _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling);
_filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops);
std::lock_guard guard(_lock);
for (auto& ph : _persistenceHandlers) {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 78f53de46b3..8287fe27509 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -32,7 +32,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
_cluster_context(cluster_context),
_env(env),
_spi(spi),
- _operation_throttler(_env._fileStorHandler.operation_throttler()),
_monitored_ref_count(std::make_unique<MonitoredRefCount>()),
_maxChunkSize(maxChunkSize),
_commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
@@ -515,7 +514,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
spi::Context& context,
const document::DocumentTypeRepo& repo) const
{
- auto throttle_token = throttle_merge_feed_ops() ? _operation_throttler.blocking_acquire_one()
+ auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one()
: vespalib::SharedOperationThrottler::Token();
spi::Timestamp timestamp(e._entry._timestamp);
if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) {
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index 93fb7efc8d0..1ed2fa878bc 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -24,7 +24,6 @@
namespace vespalib {
class ISequencedTaskExecutor;
-class SharedOperationThrottler;
}
namespace storage {
@@ -96,7 +95,6 @@ private:
const ClusterContext &_cluster_context;
PersistenceUtil &_env;
spi::PersistenceProvider &_spi;
- vespalib::SharedOperationThrottler& _operation_throttler;
std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;