aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-25 19:42:41 +0200
committerGitHub <noreply@github.com>2021-10-25 19:42:41 +0200
commit37d050d026ae1cdbcb1ba2dd4848107fa8d6fd61 (patch)
tree3cb25d04708ac3c2158c17ed0117bf883a8caf9d
parentcb6a8b7ccc971f4264fd3e0cb1917ba94e75552f (diff)
parent5b943fc1a120a2548cad10568fbf7e8819cbad3e (diff)
Merge pull request #19725 from vespa-engine/vekterli/prioritize-forwarded-merges-in-throttler-queuev7.489.25
Prioritize forwarded merges in MergeThrottler queue
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp32
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h22
3 files changed, 38 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 5c192942521..e8f8e425af4 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test {
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+
+ std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
+ auto& queue = _throttlers[throttler_idx]->getMergeQueue();
+ assert(!queue.empty());
+ auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg);
+ assert(merge);
+ return merge;
+ }
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg;
+ auto nextMerge = peek_throttler_queue_top(0);
auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(),
- static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ nextMerge->getBucketId());
- ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes()
- == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes());
+ ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes());
// Ensure forwarded merge has a higher priority than the next queued one
- EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority());
+ EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority());
EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue());
}
@@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
waitUntilMergeQueueIs(throttler, 4, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = throttler.getMergeQueue().begin()->_msg;
-
- ASSERT_EQ(BucketId(32, 0x1337),
- dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ auto nextMerge = peek_throttler_queue_top(1);
+ ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId());
auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch
EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
+ waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
+
+ auto highest_pri_merge = peek_throttler_queue_top(1);
+ EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
std::vector<MergeBucketCommand::Node> nodes;
nodes.push_back(1);
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index a17c77f6ca4..05e50492206 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -395,7 +395,8 @@ MergeThrottler::enqueueMerge(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- _queue.emplace(msg, _queueSequence++);
+ const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty();
+ _queue.emplace(msg, _queueSequence++, is_forwarded_merge);
_metrics->queueSize.set(_queue.size());
}
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 997477a4b70..da301172a3a 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -80,21 +80,25 @@ private:
MessageType _msg;
metrics::MetricTimer _startTimer;
uint64_t _sequence;
-
- StablePriorityOrderingWrapper(const MessageType& msg, uint64_t sequence)
- : _msg(msg), _startTimer(), _sequence(sequence)
+ bool _is_forwarded_merge;
+
+ StablePriorityOrderingWrapper(const MessageType& msg,
+ uint64_t sequence,
+ bool is_forwarded_merge) noexcept
+ : _msg(msg),
+ _startTimer(),
+ _sequence(sequence),
+ _is_forwarded_merge(is_forwarded_merge)
{
}
- bool operator==(const StablePriorityOrderingWrapper& other) const {
- return (*_msg == *other._msg
- && _sequence == other._sequence);
- }
-
- bool operator<(const StablePriorityOrderingWrapper& other) const {
+ bool operator<(const StablePriorityOrderingWrapper& other) const noexcept {
if (_msg->getPriority() < other._msg->getPriority()) {
return true;
}
+ if (_is_forwarded_merge != other._is_forwarded_merge) {
+ return _is_forwarded_merge; // Forwarded merges sort before non-forwarded merges.
+ }
return (_sequence < other._sequence);
}
};