diff options
7 files changed, 54 insertions, 37 deletions
diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index e54b503ed93..531805d3039 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -80,7 +80,7 @@ resource_usage_reporter_noise_level double default=0.001 ## - DYNAMIC uses DynamicThrottlePolicy under the hood and will block if the window ## is full (if a blocking throttler API call is invoked). ## -async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart +async_operation_throttler.type enum { UNLIMITED, DYNAMIC } default=UNLIMITED ## Internal throttler tuning parameters that only apply when type == DYNAMIC: async_operation_throttler.window_size_increment int default=20 async_operation_throttler.window_size_decrement_factor double default=1.2 @@ -104,7 +104,7 @@ async_operation_throttler.throttle_individual_merge_feed_ops bool default=true ## is full (if a blocking throttler API call is invoked). ## ## TODO deprecate in favor of the async_operation_throttler struct instead. -async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED restart +async_operation_throttler_type enum { UNLIMITED, DYNAMIC } default=UNLIMITED ## Specifies the extent the throttling window is increased by when the async throttle ## policy has decided that more concurrent operations are desirable. Also affects the diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 5c243ea4af9..cf0c3e68a69 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -276,6 +276,10 @@ public: virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0; + virtual void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) = 0; + + virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0; + virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; }; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index b5de5a233cc..de785c16cdf 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -40,18 +40,20 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) - : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::make_unlimited_throttler()) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg, vespalib::SharedOperationThrottler::DynamicThrottleParams()) { } FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg, - std::unique_ptr<vespalib::SharedOperationThrottler> operation_throttler) + const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params) : _component(compReg, "filestorhandlerimpl"), _state(FileStorHandler::AVAILABLE), _metrics(nullptr), - _operation_throttler(std::move(operation_throttler)), + _dynamic_operation_throttler(vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_throttle_params)), + _unlimited_operation_throttler(vespalib::SharedOperationThrottler::make_unlimited_throttler()), + _active_throttler(_unlimited_operation_throttler.get()), // Will be set by FileStorManager _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), @@ -251,6 +253,22 @@ FileStorHandlerImpl::schedule_and_get_next_async_message(const std::shared_ptr<a return {}; } +void +FileStorHandlerImpl::reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) +{ + _dynamic_operation_throttler->reconfigure_dynamic_throttling(params); +} + +void +FileStorHandlerImpl::use_dynamic_operation_throttling(bool use_dynamic) noexcept +{ + // Use release semantics instead of relaxed to ensure transitive visibility even in + // non-persistence threads that try to invoke the throttler (i.e. RPC threads). + _active_throttler.store(use_dynamic ? _dynamic_operation_throttler.get() + : _unlimited_operation_throttler.get(), + std::memory_order_release); +} + bool FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) { @@ -333,9 +351,9 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) std::lock_guard lockGuard(_mergeStatesLock); _metrics->pendingMerges.addValue(_mergeStates.size()); _metrics->queueSize.addValue(getQueueSize()); - _metrics->throttle_window_size.addValue(_operation_throttler->current_window_size()); - _metrics->throttle_waiting_threads.addValue(_operation_throttler->waiting_threads()); - _metrics->throttle_active_tokens.addValue(_operation_throttler->current_active_token_count()); + _metrics->throttle_window_size.addValue(operation_throttler().current_window_size()); + _metrics->throttle_waiting_threads.addValue(operation_throttler().waiting_threads()); + _metrics->throttle_active_tokens.addValue(operation_throttler().current_active_token_count()); for (const auto & stripe : _metrics->stripes) { const auto & m = stripe->averageQueueWaitingTime; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 1bc0ab87b1c..9e245e9eddc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -192,7 +192,8 @@ public: FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&, std::unique_ptr<vespalib::SharedOperationThrottler>); + ServiceLayerComponentRegister&, + const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params); ~FileStorHandlerImpl() override; void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } @@ -245,9 +246,18 @@ public: void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; vespalib::SharedOperationThrottler& operation_throttler() const noexcept override { - return *_operation_throttler; + // It would be reasonable to assume that this could be a relaxed load since the set + // of possible throttlers is static and all _persistence_ thread creation is sequenced + // after throttler creation. But since the throttler may be invoked by RPC threads + // created in another context, use acquire semantics to ensure transitive visibility. + // TODO remove need for atomics once the throttler testing dust settles + return *_active_throttler.load(std::memory_order_acquire); } + void reconfigure_dynamic_throttler(const vespalib::SharedOperationThrottler::DynamicThrottleParams& params) override; + + void use_dynamic_operation_throttling(bool use_dynamic) noexcept override; + void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept override { // Relaxed is fine, worst case from temporarily observing a stale value is that // an ApplyBucketDiff message is (or isn't) throttled at a high level. @@ -264,7 +274,9 @@ private: ServiceLayerComponent _component; std::atomic<DiskState> _state; FileStorDiskMetrics * _metrics; - std::unique_ptr<vespalib::SharedOperationThrottler> _operation_throttler; + std::unique_ptr<vespalib::SharedOperationThrottler> _dynamic_operation_throttler; + std::unique_ptr<vespalib::SharedOperationThrottler> _unlimited_operation_throttler; + std::atomic<vespalib::SharedOperationThrottler*> _active_throttler; std::vector<Stripe> _stripes; MessageSender& _messageSender; const document::BucketIdFactory& _bucketIdFactory; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f4aff96b53c..09bd842c308 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -164,20 +164,6 @@ dynamic_throttle_params_from_config(const StorFilestorConfig& config, uint32_t n return params; } -std::unique_ptr<vespalib::SharedOperationThrottler> -make_operation_throttler_from_config(const StorFilestorConfig& config, uint32_t num_threads) -{ - // TODO only use struct config field instead once config model is updated - const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) || - (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC)); - if (use_dynamic_throttling) { - auto dyn_params = dynamic_throttle_params_from_config(config, num_threads); - return vespalib::SharedOperationThrottler::make_dynamic_throttler(dyn_params); - } else { - return vespalib::SharedOperationThrottler::make_unlimited_throttler(); - } -} - #ifdef __PIC__ #define TLS_LINKAGE __attribute__((visibility("hidden"), tls_model("initial-exec"))) #else @@ -216,18 +202,17 @@ FileStorManager::getThreadLocalHandler() { } return *_G_threadLocalHandler; } -/** - * If live configuration, assuming storageserver makes sure no messages are - * incoming during reconfiguration - */ + void FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) { // If true, this is not the first configure. - bool liveUpdate = ! _threads.empty(); + const bool liveUpdate = ! _threads.empty(); _use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule; _host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel); + const bool use_dynamic_throttling = ((config->asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) || + (config->asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC)); const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { @@ -235,10 +220,10 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) uint32_t numThreads = std::max(1, _config->numThreads); uint32_t numStripes = std::max(1u, numThreads / 2); _metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config)); - auto operation_throttler = make_operation_throttler_from_config(*_config, numThreads); + auto dyn_params = dynamic_throttle_params_from_config(*_config, numThreads); _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, - _compReg, std::move(operation_throttler)); + _compReg, dyn_params); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); _sequencedExecutor = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(response_executor, CpuUsage::Category::WRITE), numResponseThreads, 10000, @@ -253,10 +238,11 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) } else { assert(_filestorHandler); auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size()); - _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params); + _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params); } // TODO remove once desired dynamic throttling behavior is set in stone { + _filestorHandler->use_dynamic_operation_throttling(use_dynamic_throttling); _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); std::lock_guard guard(_lock); for (auto& ph : _persistenceHandlers) { diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 78f53de46b3..8287fe27509 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -32,7 +32,6 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _cluster_context(cluster_context), _env(env), _spi(spi), - _operation_throttler(_env._fileStorHandler.operation_throttler()), _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), @@ -515,7 +514,7 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results spi::Context& context, const document::DocumentTypeRepo& repo) const { - auto throttle_token = throttle_merge_feed_ops() ? _operation_throttler.blocking_acquire_one() + auto throttle_token = throttle_merge_feed_ops() ? _env._fileStorHandler.operation_throttler().blocking_acquire_one() : vespalib::SharedOperationThrottler::Token(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index 93fb7efc8d0..1ed2fa878bc 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -24,7 +24,6 @@ namespace vespalib { class ISequencedTaskExecutor; -class SharedOperationThrottler; } namespace storage { @@ -96,7 +95,6 @@ private: const ClusterContext &_cluster_context; PersistenceUtil &_env; spi::PersistenceProvider &_spi; - vespalib::SharedOperationThrottler& _operation_throttler; std::unique_ptr<vespalib::MonitoredRefCount> _monitored_ref_count; const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; |