diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-02-24 15:07:43 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-02-24 15:15:02 +0000 |
commit | 7364304c8a32b4a744335c95c72e036da080ec22 (patch) | |
tree | 90bda5625ad5b74fa46b4762cd367f93d2a5706c /storage | |
parent | ccf690fbe40e769161c6bf15bbd0d2f87ad7cefa (diff) |
Make it possible to configure what aspects of merges should be throttled
Add live config for choosing whether merges should be throttled
on a per-feed operation (`MergeHandler`) level, or on an `ApplyBucketDiff`
persistence thread level.
This is intended to be a temporary feature while we do experiments, so
some liberties are taken with regards to how holes are punched in the
various abstraction layers.
Diffstat (limited to 'storage')
8 files changed, 52 insertions, 9 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 66dc7126058..5c243ea4af9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -275,6 +275,8 @@ public: virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0; virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0; + + virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 10df36579cd..b5de5a233cc 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -58,6 +58,7 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe _getNextMessageTimeout(100ms), _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false), + _throttle_apply_bucket_diff_ops(false), _last_active_operations_stats() { assert(numStripes > 0); @@ -125,7 +126,7 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api: std::lock_guard mlock(_mergeStatesLock); auto it = _mergeStates.find(bucket); if (it == _mergeStates.end()) { - if (code != 0) { + if (code != nullptr) { LOG(debug, "Merge state not present at the time of clear. " "Could not fail merge of bucket %s with code %s.", bucket.toString().c_str(), code->toString().c_str()); @@ -135,7 +136,7 @@ FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api: } return; } - if (code != 0) { + if (code != nullptr) { std::shared_ptr<MergeStatus> statusPtr(it->second); assert(statusPtr.get()); MergeStatus& status(*statusPtr); @@ -890,10 +891,8 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe _active_operations_stats() {} -namespace { - bool -operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept +FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept { // Note: SetBucketState is intentionally _not_ included in this set, even though it's // dispatched async. The rationale behind this is that SetBucketState is very cheap @@ -911,13 +910,14 @@ operation_type_should_be_throttled(api::MessageType::Id type_id) noexcept case api::MessageType::CREATEBUCKET_ID: case api::MessageType::DELETEBUCKET_ID: return true; + case api::MessageType::APPLYBUCKETDIFF_ID: + case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: + return _owner.throttle_apply_bucket_diff_ops(); default: return false; } } -} - FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 698f52359f5..1bc0ab87b1c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -144,6 +144,7 @@ public: private: bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const; FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard); + [[nodiscard]] bool operation_type_should_be_throttled(api::MessageType::Id type_id) const noexcept; // Precondition: the bucket used by `iter`s operation is not locked in a way that conflicts // with its locking requirements. @@ -247,6 +248,12 @@ public: return *_operation_throttler; } + void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept override { + // Relaxed is fine, worst case from temporarily observing a stale value is that + // an ApplyBucketDiff message is (or isn't) throttled at a high level. + _throttle_apply_bucket_diff_ops.store(throttle_apply_bucket_diff, std::memory_order_relaxed); + } + // Implements ResumeGuard::Callback void resume() override; @@ -268,6 +275,7 @@ private: mutable std::mutex _pauseMonitor; mutable std::condition_variable _pauseCond; std::atomic<bool> _paused; + std::atomic<bool> _throttle_apply_bucket_diff_ops; std::optional<ActiveOperationsStats> _last_active_operations_stats; // Returns the index in the targets array we are sending to, or -1 if none of them match. @@ -287,6 +295,10 @@ private: */ bool isPaused() const { return _paused.load(std::memory_order_relaxed); } + [[nodiscard]] bool throttle_apply_bucket_diff_ops() const noexcept { + return _throttle_apply_bucket_diff_ops.load(std::memory_order_relaxed); + } + /** * Return whether msg has timed out based on waitTime and the message's * specified timeout. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index e71b818b1ea..f4aff96b53c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -228,6 +228,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) _use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule; _host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel); + const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { _config = std::move(config); @@ -254,6 +255,14 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size()); _filestorHandler->operation_throttler().reconfigure_dynamic_throttling(updated_dyn_throttle_params); } + // TODO remove once desired dynamic throttling behavior is set in stone + { + _filestorHandler->set_throttle_apply_bucket_diff_ops(!throttle_merge_feed_ops); + std::lock_guard guard(_lock); + for (auto& ph : _persistenceHandlers) { + ph->set_throttle_merge_feed_ops(throttle_merge_feed_ops); + } + } } void diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 4a5362f3d8d..78f53de46b3 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -36,7 +36,8 @@ MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi, _monitored_ref_count(std::make_unique<MonitoredRefCount>()), _maxChunkSize(maxChunkSize), _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize), - _executor(executor) + _executor(executor), + _throttle_merge_feed_ops(true) { } @@ -514,7 +515,8 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results spi::Context& context, const document::DocumentTypeRepo& repo) const { - auto throttle_token = _operation_throttler.blocking_acquire_one(); + auto throttle_token = throttle_merge_feed_ops() ? _operation_throttler.blocking_acquire_one() + : vespalib::SharedOperationThrottler::Token(); spi::Timestamp timestamp(e._entry._timestamp); if (!(e._entry._flags & (DELETED | DELETED_IN_PLACE))) { // Regular put entry diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h index ee6eed63eb5..93fb7efc8d0 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.h +++ b/storage/src/vespa/storage/persistence/mergehandler.h @@ -81,6 +81,15 @@ public: void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const; void drain_async_writes(); + // Thread safe, as it's set during live reconfig from the main filestor manager. + void set_throttle_merge_feed_ops(bool throttle) noexcept { + _throttle_merge_feed_ops.store(throttle, std::memory_order_relaxed); + } + + [[nodiscard]] bool throttle_merge_feed_ops() const noexcept { + return _throttle_merge_feed_ops.load(std::memory_order_relaxed); + } + private: using DocEntryList = std::vector<std::unique_ptr<spi::DocEntry>>; const framework::Clock &_clock; @@ -92,6 +101,7 @@ private: const uint32_t _maxChunkSize; const uint32_t _commonMergeChainOptimalizationMinimumSize; vespalib::ISequencedTaskExecutor& _executor; + std::atomic<bool> _throttle_merge_feed_ops; MessageTrackerUP handleGetBucketDiffStage2(api::GetBucketDiffCommand&, MessageTrackerUP) const; /** Returns a reply if merge is complete */ diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp index 41df1e8a075..8d71cc9308b 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.cpp +++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp @@ -181,4 +181,10 @@ PersistenceHandler::processLockedMessage(FileStorHandler::LockedMessage lock) co } } +void +PersistenceHandler::set_throttle_merge_feed_ops(bool throttle) noexcept +{ + _mergeHandler.set_throttle_merge_feed_ops(throttle); +} + } diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h index a92c2dc78ca..c2df52e2fd6 100644 --- a/storage/src/vespa/storage/persistence/persistencehandler.h +++ b/storage/src/vespa/storage/persistence/persistencehandler.h @@ -35,6 +35,8 @@ public: const AsyncHandler & asyncHandler() const { return _asyncHandler; } const SplitJoinHandler & splitjoinHandler() const { return _splitJoinHandler; } const SimpleMessageHandler & simpleMessageHandler() const { return _simpleHandler; } + + void set_throttle_merge_feed_ops(bool throttle) noexcept; private: // Message handling functions MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const; |