summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-03-01 21:00:34 +0100
committerGitHub <noreply@github.com>2022-03-01 21:00:34 +0100
commit470b59dd5a402414601e37bb7ece9984d42b8653 (patch)
treeafd7fae7e71cfb099e850aaf76133270d566c597
parent2090ed6965c0c5b627a656ac272703326a75e1fb (diff)
parent4796fb920b45b26f2505babe02de0c93942c3e73 (diff)
Merge pull request #21477 from vespa-engine/balder/use-wait_until-to-reduce-need-for-sampling-time
Use wait_until and a deadline instead of a timeout.
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h12
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp12
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h9
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp7
-rw-r--r--vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp10
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/shared_operation_throttler.h4
7 files changed, 34 insertions, 28 deletions
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index cf0c3e68a69..6d8cdc16743 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -128,6 +128,7 @@ public:
CLOSED
};
+ FileStorHandler() : _getNextMessageTimout(100ms) { }
virtual ~FileStorHandler() = default;
@@ -170,7 +171,12 @@ public:
*
* @param stripe The stripe to get messages for
*/
- virtual LockedMessage getNextMessage(uint32_t stripeId) = 0;
+ virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;
+
+ /** Only used for testing, should be removed */
+ LockedMessage getNextMessage(uint32_t stripeId) {
+ return getNextMessage(stripeId, vespalib::steady_clock::now() + _getNextMessageTimout);
+ }
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -268,7 +274,7 @@ public:
virtual uint32_t getQueueSize() const = 0;
// Commands used by testing
- virtual void setGetNextMessageTimeout(vespalib::duration timeout) = 0;
+ void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimout = timeout; }
virtual std::string dumpQueue() const = 0;
@@ -281,6 +287,8 @@ public:
virtual void use_dynamic_operation_throttling(bool use_dynamic) noexcept = 0;
virtual void set_throttle_apply_bucket_diff_ops(bool throttle_apply_bucket_diff) noexcept = 0;
+private:
+ vespalib::duration _getNextMessageTimout;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index de785c16cdf..c44ae305fa2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -5,7 +5,6 @@
#include "mergestatus.h"
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storage/common/bucketmessages.h>
#include <vespa/storage/common/statusmessages.h>
@@ -57,7 +56,6 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_stripes(),
_messageSender(sender),
_bucketIdFactory(_component.getBucketIdFactory()),
- _getNextMessageTimeout(100ms),
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
_paused(false),
_throttle_apply_bucket_diff_ops(false),
@@ -395,13 +393,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg)
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::getNextMessage(uint32_t stripeId)
+FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time deadline)
{
if (!tryHandlePause()) {
return {}; // Still paused, return to allow tick.
}
- return getNextMessage(stripeId, _getNextMessageTimeout);
+ return _stripes[stripeId].getNextMessage(deadline);
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
@@ -937,7 +935,7 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType
}
FileStorHandler::LockedMessage
-FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
+FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
{
std::unique_lock guard(*_lock);
ThrottleToken throttle_token;
@@ -973,12 +971,12 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::duration timeout)
// Depending on whether we were blocked due to no usable ops in queue or throttling,
// wait for either the queue or throttler to (hopefully) have some fresh stuff for us.
if (!was_throttled) {
- _cond->wait_for(guard, timeout);
+ _cond->wait_until(guard, deadline);
} else {
// Have to release lock before doing a blocking throttle token fetch, since it
// prevents RPC threads from pushing onto the queue.
guard.unlock();
- throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout);
+ throttle_token = _owner.operation_throttler().blocking_acquire_one(deadline);
guard.lock();
if (!throttle_token.valid()) {
_metrics->timeouts_waiting_for_throttle_token.inc();
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 9e245e9eddc..dbef1d06dad 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -132,7 +132,7 @@ public:
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);
- FileStorHandler::LockedMessage getNextMessage(vespalib::duration timeout);
+ FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
@@ -196,7 +196,6 @@ public:
const vespalib::SharedOperationThrottler::DynamicThrottleParams& dyn_throttle_params);
~FileStorHandlerImpl() override;
- void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
void flush(bool killPendingMerges) override;
void setDiskState(DiskState state) override;
@@ -205,7 +204,7 @@ public:
bool schedule(const std::shared_ptr<api::StorageMessage>&) override;
ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override;
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override;
+ FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;
void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override;
void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override;
@@ -282,7 +281,6 @@ private:
const document::BucketIdFactory& _bucketIdFactory;
mutable std::mutex _mergeStatesLock;
std::map<document::Bucket, std::shared_ptr<MergeStatus>> _mergeStates;
- vespalib::duration _getNextMessageTimeout;
const uint32_t _max_active_merges_per_stripe; // Read concurrently by stripes.
mutable std::mutex _pauseMonitor;
mutable std::condition_variable _pauseCond;
@@ -367,9 +365,6 @@ private:
Stripe & stripe(const document::Bucket & bucket) {
return _stripes[stripe_index(bucket)];
}
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::duration timeout) {
- return _stripes[stripeId].getNextMessage(timeout);
- }
ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const override;
};
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index b89c60d4720..8e1fdb06ded 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -33,10 +33,13 @@ PersistenceThread::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started persistence thread");
+ vespalib::duration max_wait_time = vespalib::adjustTimeoutByDetectedHz(100ms);
while (!thread.interrupted()) {
- thread.registerTick();
+ vespalib::steady_time now = vespalib::steady_clock::now();
+ thread.registerTick(framework::UNKNOWN_CYCLE, now);
- FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId));
+ vespalib::steady_time deadline = now + max_wait_time;
+ FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline));
if (lock.lock) {
_persistenceHandler.processLockedMessage(std::move(lock));
diff --git a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
index eefc0ca72c0..d6946905236 100644
--- a/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
+++ b/vespalib/src/tests/shared_operation_throttler/shared_operation_throttler_test.cpp
@@ -4,6 +4,8 @@
#include <vespa/vespalib/util/barrier.h>
#include <thread>
+using vespalib::steady_clock;
+
namespace vespalib {
using ThrottleToken = SharedOperationThrottler::Token;
@@ -47,7 +49,7 @@ TEST_F("blocking acquire returns immediately if slot available", DynamicThrottle
auto token = f1._throttler->blocking_acquire_one();
EXPECT_TRUE(token.valid());
token.reset();
- token = f1._throttler->blocking_acquire_one(600s); // Should never block.
+ token = f1._throttler->blocking_acquire_one(steady_clock::now() + 600s); // Should never block.
EXPECT_TRUE(token.valid());
}
@@ -70,11 +72,11 @@ TEST_F("blocking call woken up if throttle slot available", DynamicThrottleFixtu
TEST_F("time-bounded blocking acquire waits for timeout", DynamicThrottleFixture()) {
auto window_filling_token = f1._throttler->try_acquire_one();
- auto before = std::chrono::steady_clock::now();
+ auto before = steady_clock::now();
// Will block for at least 1ms. Since no window slot will be available by that time,
// an invalid token should be returned.
- auto token = f1._throttler->blocking_acquire_one(1ms);
- auto after = std::chrono::steady_clock::now();
+ auto token = f1._throttler->blocking_acquire_one(before + 1ms);
+ auto after = steady_clock::now();
EXPECT_TRUE((after - before) >= 1ms);
EXPECT_FALSE(token.valid());
}
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
index 6e273d1a7ea..478d8c1b9e9 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.cpp
@@ -24,7 +24,7 @@ public:
internal_ref_count_increase();
return Token(this, TokenCtorTag{});
}
- Token blocking_acquire_one(vespalib::duration) noexcept override {
+ Token blocking_acquire_one(vespalib::steady_time) noexcept override {
internal_ref_count_increase();
return Token(this, TokenCtorTag{});
}
@@ -267,7 +267,7 @@ public:
~DynamicOperationThrottler() override;
Token blocking_acquire_one() noexcept override;
- Token blocking_acquire_one(vespalib::duration timeout) noexcept override;
+ Token blocking_acquire_one(vespalib::steady_time deadline) noexcept override;
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
uint32_t current_active_token_count() const noexcept override;
@@ -334,12 +334,12 @@ DynamicOperationThrottler::blocking_acquire_one() noexcept
}
DynamicOperationThrottler::Token
-DynamicOperationThrottler::blocking_acquire_one(vespalib::duration timeout) noexcept
+DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time deadline) noexcept
{
std::unique_lock lock(_mutex);
if (!has_spare_capacity_in_active_window()) {
++_waiting_threads;
- const bool accepted = _cond.wait_for(lock, timeout, [&] {
+ const bool accepted = _cond.wait_until(lock, deadline, [&] {
return has_spare_capacity_in_active_window();
});
--_waiting_threads;
diff --git a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
index b7913029c1e..95d6d361cb6 100644
--- a/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
+++ b/vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
@@ -54,9 +54,9 @@ public:
// Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
[[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
// Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
- // available. If the timeout is exceeded without any tokens becoming available, an
+ // available. If the deadline is reached without any tokens becoming available, an
// invalid token will be returned.
- [[nodiscard]] virtual Token blocking_acquire_one(vespalib::duration timeout) noexcept = 0;
+ [[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time deadline) noexcept = 0;
// Attempt to acquire a valid throttling token if one is immediately available.
// An invalid token will be returned if none is available. Never blocks (other than
// when contending for the internal throttler mutex).