diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-03-01 21:00:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-01 21:00:34 +0100 |
commit | 470b59dd5a402414601e37bb7ece9984d42b8653 (patch) | |
tree | afd7fae7e71cfb099e850aaf76133270d566c597 /storage | |
parent | 2090ed6965c0c5b627a656ac272703326a75e1fb (diff) | |
parent | 4796fb920b45b26f2505babe02de0c93942c3e73 (diff) |
Merge pull request #21477 from vespa-engine/balder/use-wait_until-to-reduce-need-for-sampling-time
Use wait_until and a deadline instead of a timeout.
Diffstat (limited to 'storage')
4 files changed, 22 insertions, 18 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index cf0c3e68a69..6d8cdc16743 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -128,6 +128,7 @@ public: CLOSED }; + FileStorHandler() : _getNextMessageTimout(100ms) { } virtual ~FileStorHandler() = default; @@ -170,7 +171,12 @@ public: * * @param stripe The stripe to get messages for */ - virtual LockedMessage getNextMessage(uint32_t stripeId) = 0; + virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; + + /** Only used for testing, should be removed */ + LockedMessage getNextMessage(uint32_t stripeId) { + return getNextMessage(stripeId, vespalib::steady_clock::now() + _getNextMessageTimout); + } /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -268,7 +274,7 @@ public: virtual uint32_t getQueueSize() const = 0; // Commands used by testing - virtual void setGetNextMessageTimeout(vespalib::duration timeout) = 0; + void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimout = timeout; } virtual std::string dumpQueue() const = 0; @@ -281,6 +287,8 @@ public: virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0; virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; +private: + vespalib::duration _getNextMessageTimout; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index de785c16cdf..c44ae305fa2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -5,7 +5,6 @@ #include "mergestatus.h" #include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/storageapi/message/removelocation.h> #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storage/common/bucketmessages.h> #include <vespa/storage/common/statusmessages.h> @@ -57,7 +56,6 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), - _getNextMessageTimeout(100ms), _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false), _throttle_apply_bucket_diff_ops(false), @@ -395,13 +393,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint32_t stripeId) +FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) { if (!tryHandlePause()) { return {}; // Still paused, return to allow tick. } - return getNextMessage(stripeId, _getNextMessageTimeout); + return _stripes[stripeId].getNextMessage(deadline); } std::shared_ptr<FileStorHandler::BucketLockInterface> @@ -937,7 +935,7 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) { std::unique_lock guard(*_lock); ThrottleToken throttle_token; @@ -973,12 +971,12 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) // Depending on whether we were blocked due to no usable ops in queue or throttling, // wait for either the queue or throttler to (hopefully) have some fresh stuff for us. if (!was_throttled) { - _cond->wait_for(guard, timeout); + _cond->wait_until(guard, deadline); } else { // Have to release lock before doing a blocking throttle token fetch, since it // prevents RPC threads from pushing onto the queue. guard.unlock(); - throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout); + throttle_token = _owner.operation_throttler().blocking_acquire_one(deadline); guard.lock(); if (!throttle_token.valid()) { _metrics->timeouts_waiting_for_throttle_token.inc(); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 9e245e9eddc..dbef1d06dad 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -132,7 +132,7 @@ public: std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout); + FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -196,7 +196,6 @@ public: const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params); ~FileStorHandlerImpl() override; - void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } void flush(bool killPendingMerges) override; void setDiskState(DiskState state) override; @@ -205,7 +204,7 @@ public: bool schedule(const std::shared_ptr<api::StorageMessage>&) override; ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override; + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; @@ -282,7 +281,6 @@ private: const document::BucketIdFactory& _bucketIdFactory; mutable std::mutex _mergeStatesLock; std::map<document::Bucket, std::shared_ptr<MergeStatus>> _mergeStates; - vespalib::duration _getNextMessageTimeout; const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes. mutable std::mutex _pauseMonitor; mutable std::condition_variable _pauseCond; @@ -367,9 +365,6 @@ private: Stripe & stripe(const document::Bucket & bucket) { return _stripes[stripe_index(bucket)]; } - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) { - return _stripes[stripeId].getNextMessage(timeout); - } ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const override; }; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index b89c60d4720..8e1fdb06ded 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -33,10 +33,13 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread"); + vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms); while (!thread.interrupted()) { - thread.registerTick(); + vespalib::steady_time now = vespalib::steady_clock::now(); + thread.registerTick(framework::UNKNOWN_CYCLE, now); - FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId)); + vespalib::steady_time deadline = now + max_wait_time; + FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline)); if (lock.lock) { _persistenceHandler.processLockedMessage(std::move(lock)); |