aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-10-25 15:44:54 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-10-25 15:44:54 +0000
commit5b943fc1a120a2548cad10568fbf7e8819cbad3e (patch)
treeae80155f6459a694b5a8adee4a3b8495121b48ac
parent23f6c9faddfbffa44f9dffbd792e1342cec9d64b (diff)
Prioritize forwarded merges in MergeThrottler queue
Rationale: merges that already are part of an active merge window are taking up logical resources on one or more nodes in the cluster and we should prefer completing these before starting new merges queued from distributors.
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp32
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h22
3 files changed, 38 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 5c192942521..e8f8e425af4 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test {
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+
+ std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
+ auto& queue = _throttlers[throttler_idx]->getMergeQueue();
+ assert(!queue.empty());
+ auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg);
+ assert(merge);
+ return merge;
+ }
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg;
+ auto nextMerge = peek_throttler_queue_top(0);
auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(),
- static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ nextMerge->getBucketId());
- ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes()
- == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes());
+ ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes());
// Ensure forwarded merge has a higher priority than the next queued one
- EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority());
+ EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority());
EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue());
}
@@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
waitUntilMergeQueueIs(throttler, 4, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = throttler.getMergeQueue().begin()->_msg;
-
- ASSERT_EQ(BucketId(32, 0x1337),
- dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ auto nextMerge = peek_throttler_queue_top(1);
+ ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId());
auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch
EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
+ waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
+
+ auto highest_pri_merge = peek_throttler_queue_top(1);
+ EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
std::vector<MergeBucketCommand::Node> nodes;
nodes.push_back(1);
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index a17c77f6ca4..05e50492206 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -395,7 +395,8 @@ MergeThrottler::enqueueMerge(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- _queue.emplace(msg, _queueSequence++);
+ const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty();
+ _queue.emplace(msg, _queueSequence++, is_forwarded_merge);
_metrics->queueSize.set(_queue.size());
}
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 997477a4b70..da301172a3a 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -80,21 +80,25 @@ private:
MessageType _msg;
metrics::MetricTimer _startTimer;
uint64_t _sequence;
-
- StablePriorityOrderingWrapper(const MessageType& msg, uint64_t sequence)
- : _msg(msg), _startTimer(), _sequence(sequence)
+ bool _is_forwarded_merge;
+
+ StablePriorityOrderingWrapper(const MessageType& msg,
+ uint64_t sequence,
+ bool is_forwarded_merge) noexcept
+ : _msg(msg),
+ _startTimer(),
+ _sequence(sequence),
+ _is_forwarded_merge(is_forwarded_merge)
{
}
- bool operator==(const StablePriorityOrderingWrapper& other) const {
- return (*_msg == *other._msg
- && _sequence == other._sequence);
- }
-
- bool operator<(const StablePriorityOrderingWrapper& other) const {
+ bool operator<(const StablePriorityOrderingWrapper& other) const noexcept {
if (_msg->getPriority() < other._msg->getPriority()) {
return true;
}
+ if (_is_forwarded_merge != other._is_forwarded_merge) {
+ return _is_forwarded_merge; // Forwarded merges sort before non-forwarded merges.
+ }
return (_sequence < other._sequence);
}
};