From c0e949b12fdcdcb40f65d6898bc0c41689f0ff3f Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 1 Mar 2022 11:25:10 +0000 Subject: Use wait_until and a deadline instead of a timeout. This reduces the need to smaple the time significantly. --- .../vespa/storage/persistence/filestorage/filestorhandler.h | 12 ++++++++++-- .../storage/persistence/filestorage/filestorhandlerimpl.cpp | 12 +++++------- .../storage/persistence/filestorage/filestorhandlerimpl.h | 9 ++------- storage/src/vespa/storage/persistence/persistencethread.cpp | 7 +++++-- .../shared_operation_throttler_test.cpp | 10 ++++++---- .../src/vespa/vespalib/util/shared_operation_throttler.cpp | 8 ++++---- .../src/vespa/vespalib/util/shared_operation_throttler.h | 2 +- 7 files changed, 33 insertions(+), 27 deletions(-) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 5c243ea4af9..250bbe369c9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -128,6 +128,7 @@ public: CLOSED }; + FileStorHandler() : _getNextMessageTimout(100ms) { } virtual ~FileStorHandler() = default; @@ -170,7 +171,12 @@ public: * * @param stripe The stripe to get messages for */ - virtual LockedMessage getNextMessage(uint32_t stripeId) = 0; + virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) = 0; + + /** Only used for testing, should be removed */ + LockedMessage getNextMessage(uint32_t stripeId) { + return getNextMessage(stripeId, vespalib::steady_clock::now() + _getNextMessageTimout); + } /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -268,7 +274,7 @@ public: virtual uint32_t getQueueSize() const = 0; // Commands used by testing - virtual void setGetNextMessageTimeout(vespalib::duration timeout) = 0; + void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimout = timeout; } virtual std::string dumpQueue() const = 0; @@ -277,6 +283,8 @@ public: virtual vespalib::SharedOperationThrottler& operation_throttler() const noexcept = 0; virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0; +private: + vespalib::duration _getNextMessageTimout; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index b5de5a233cc..77617cecad3 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -5,7 +5,6 @@ #include "mergestatus.h" #include #include -#include #include #include #include @@ -55,7 +54,6 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe _stripes(), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), - _getNextMessageTimeout(100ms), _max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)), _paused(false), _throttle_apply_bucket_diff_ops(false), @@ -377,13 +375,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint32_t stripeId) +FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) { if (!tryHandlePause()) { return {}; // Still paused, return to allow tick. } - return getNextMessage(stripeId, _getNextMessageTimeout); + return _stripes[stripeId].getNextMessage(timeout_end); } std::shared_ptr @@ -919,7 +917,7 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time timeout_end) { std::unique_lock guard(*_lock); ThrottleToken throttle_token; @@ -955,12 +953,12 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout) // Depending on whether we were blocked due to no usable ops in queue or throttling, // wait for either the queue or throttler to (hopefully) have some fresh stuff for us. if (!was_throttled) { - _cond->wait_for(guard, timeout); + _cond->wait_until(guard, timeout_end); } else { // Have to release lock before doing a blocking throttle token fetch, since it // prevents RPC threads from pushing onto the queue. guard.unlock(); - throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout); + throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout_end); guard.lock(); if (!throttle_token.valid()) { _metrics->timeouts_waiting_for_throttle_token.inc(); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 1bc0ab87b1c..c18b51c5d10 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -132,7 +132,7 @@ public: std::shared_ptr lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout); + FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time timeout_end); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -195,7 +195,6 @@ public: ServiceLayerComponentRegister&, std::unique_ptr); ~FileStorHandlerImpl() override; - void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } void flush(bool killPendingMerges) override; void setDiskState(DiskState state) override; @@ -204,7 +203,7 @@ public: bool schedule(const std::shared_ptr&) override; ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr& msg) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override; + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) override; void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; @@ -270,7 +269,6 @@ private: const document::BucketIdFactory& _bucketIdFactory; mutable std::mutex _mergeStatesLock; std::map> _mergeStates; - vespalib::duration _getNextMessageTimeout; const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes. mutable std::mutex _pauseMonitor; mutable std::condition_variable _pauseCond; @@ -355,9 +353,6 @@ private: Stripe & stripe(const document::Bucket & bucket) { return _stripes[stripe_index(bucket)]; } - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) { - return _stripes[stripeId].getNextMessage(timeout); - } ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const override; }; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index b89c60d4720..208481bde27 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -33,10 +33,13 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread"); + vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms); while (!thread.interrupted()) { - thread.registerTick(); + vespalib::steady_time now = vespalib::steady_clock::now(); + thread.registerTick(framework::UNKNOWN_CYCLE, now); - FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId)); + vespalib::steady_time doom = now + max_wait_time; + FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, doom)); if (lock.lock) { _persistenceHandler.processLockedMessage(std::move(lock)); diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp index eefc0ca72c0..d6946905236 100644 --- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp +++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp @@ -4,6 +4,8 @@ #include #include +using vespalib::steady_clock; + namespace vespalib { using ThrottleToken = SharedOperationThrottler::Token; @@ -47,7 +49,7 @@ TEST_F("blocking acquire returns immediately if slot available", DynamicThrottle auto token = f1._throttler->blocking_acquire_one(); EXPECT_TRUE(token.valid()); token.reset(); - token = f1._throttler->blocking_acquire_one(600s); // Should never block. + token = f1._throttler->blocking_acquire_one(steady_clock::now() + 600s); // Should never block. EXPECT_TRUE(token.valid()); } @@ -70,11 +72,11 @@ TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixtu TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) { auto window_filling_token = f1._throttler->try_acquire_one(); - auto before = std::chrono::steady_clock::now(); + auto before = steady_clock::now(); // Will block for at least 1ms. Since no window slot will be available by that time, // an invalid token should be returned. - auto token = f1._throttler->blocking_acquire_one(1ms); - auto after = std::chrono::steady_clock::now(); + auto token = f1._throttler->blocking_acquire_one(before + 1ms); + auto after = steady_clock::now(); EXPECT_TRUE((after - before) >= 1ms); EXPECT_FALSE(token.valid()); } diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp index 6e273d1a7ea..e5d492dcf1c 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -24,7 +24,7 @@ public: internal_ref_count_increase(); return Token(this, TokenCtorTag{}); } - Token blocking_acquire_one(vespalib::duration) noexcept override { + Token blocking_acquire_one(vespalib::steady_time) noexcept override { internal_ref_count_increase(); return Token(this, TokenCtorTag{}); } @@ -267,7 +267,7 @@ public: ~DynamicOperationThrottler() override; Token blocking_acquire_one() noexcept override; - Token blocking_acquire_one(vespalib::duration timeout) noexcept override; + Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept override; Token try_acquire_one() noexcept override; uint32_t current_window_size() const noexcept override; uint32_t current_active_token_count() const noexcept override; @@ -334,12 +334,12 @@ DynamicOperationThrottler::blocking_acquire_one() noexcept } DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept +DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time timeout_end) noexcept { std::unique_lock lock(_mutex); if (!has_spare_capacity_in_active_window()) { ++_waiting_threads; - const bool accepted = _cond.wait_for(lock, timeout, [&] { + const bool accepted = _cond.wait_until(lock, timeout_end, [&] { return has_spare_capacity_in_active_window(); }); --_waiting_threads; diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h index b7913029c1e..bb59b634e7c 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -56,7 +56,7 @@ public: // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be // available. If the timeout is exceeded without any tokens becoming available, an // invalid token will be returned. - [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0; + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept = 0; // Attempt to acquire a valid throttling token if one is immediately available. // An invalid token will be returned if none is available. Never blocks (other than // when contending for the internal throttler mutex). -- cgit v1.2.3 From 4796fb920b45b26f2505babe02de0c93942c3e73 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 1 Mar 2022 14:06:44 +0000 Subject: timeout_end -> deadline --- .../vespa/storage/persistence/filestorage/filestorhandler.h | 2 +- .../storage/persistence/filestorage/filestorhandlerimpl.cpp | 10 +++++----- .../storage/persistence/filestorage/filestorhandlerimpl.h | 4 ++-- storage/src/vespa/storage/persistence/persistencethread.cpp | 4 ++-- .../src/vespa/vespalib/util/shared_operation_throttler.cpp | 6 +++--- vespalib/src/vespa/vespalib/util/shared_operation_throttler.h | 4 ++-- 6 files changed, 15 insertions(+), 15 deletions(-) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 250bbe369c9..dfeab4c0ee1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -171,7 +171,7 @@ public: * * @param stripe The stripe to get messages for */ - virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) = 0; + virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0; /** Only used for testing, should be removed */ LockedMessage getNextMessage(uint32_t stripeId) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 77617cecad3..b42699a0efe 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -375,13 +375,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg) } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) +FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) { if (!tryHandlePause()) { return {}; // Still paused, return to allow tick. } - return _stripes[stripeId].getNextMessage(timeout_end); + return _stripes[stripeId].getNextMessage(deadline); } std::shared_ptr @@ -917,7 +917,7 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType } FileStorHandler::LockedMessage -FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time timeout_end) +FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline) { std::unique_lock guard(*_lock); ThrottleToken throttle_token; @@ -953,12 +953,12 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time timeout_end) // Depending on whether we were blocked due to no usable ops in queue or throttling, // wait for either the queue or throttler to (hopefully) have some fresh stuff for us. if (!was_throttled) { - _cond->wait_until(guard, timeout_end); + _cond->wait_until(guard, deadline); } else { // Have to release lock before doing a blocking throttle token fetch, since it // prevents RPC threads from pushing onto the queue. guard.unlock(); - throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout_end); + throttle_token = _owner.operation_throttler().blocking_acquire_one(deadline); guard.lock(); if (!throttle_token.valid()) { _metrics->timeouts_waiting_for_throttle_token.inc(); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index c18b51c5d10..74830a9d599 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -132,7 +132,7 @@ public: std::shared_ptr lock(const document::Bucket & bucket, api::LockingRequirements lockReq); void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); - FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time timeout_end); + FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -203,7 +203,7 @@ public: bool schedule(const std::shared_ptr&) override; ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr& msg) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) override; + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override; void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 208481bde27..8e1fdb06ded 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -38,8 +38,8 @@ PersistenceThread::run(framework::ThreadHandle& thread) vespalib::steady_time now = vespalib::steady_clock::now(); thread.registerTick(framework::UNKNOWN_CYCLE, now); - vespalib::steady_time doom = now + max_wait_time; - FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, doom)); + vespalib::steady_time deadline = now + max_wait_time; + FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline)); if (lock.lock) { _persistenceHandler.processLockedMessage(std::move(lock)); diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp index e5d492dcf1c..478d8c1b9e9 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp @@ -267,7 +267,7 @@ public: ~DynamicOperationThrottler() override; Token blocking_acquire_one() noexcept override; - Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept override; + Token blocking_acquire_one(vespalib::steady_time deadline) noexcept override; Token try_acquire_one() noexcept override; uint32_t current_window_size() const noexcept override; uint32_t current_active_token_count() const noexcept override; @@ -334,12 +334,12 @@ DynamicOperationThrottler::blocking_acquire_one() noexcept } DynamicOperationThrottler::Token -DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time timeout_end) noexcept +DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time deadline) noexcept { std::unique_lock lock(_mutex); if (!has_spare_capacity_in_active_window()) { ++_waiting_threads; - const bool accepted = _cond.wait_until(lock, timeout_end, [&] { + const bool accepted = _cond.wait_until(lock, deadline, [&] { return has_spare_capacity_in_active_window(); }); --_waiting_threads; diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h index bb59b634e7c..95d6d361cb6 100644 --- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h +++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h @@ -54,9 +54,9 @@ public: // Acquire a valid throttling token, uninterruptedly blocking until one can be obtained. [[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0; // Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be - // available. If the timeout is exceeded without any tokens becoming available, an + // available. If the deadline is reached without any tokens becoming available, an // invalid token will be returned. - [[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept = 0; + [[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time deadline) noexcept = 0; // Attempt to acquire a valid throttling token if one is immediately available. // An invalid token will be returned if none is available. Never blocks (other than // when contending for the internal throttler mutex). -- cgit v1.2.3