From 5b943fc1a120a2548cad10568fbf7e8819cbad3e Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Mon, 25 Oct 2021 15:44:54 +0000 Subject: Prioritize forwarded merges in MergeThrottler queue Rationale: merges that already are part of an active merge window are taking up logical resources on one or more nodes in the cluster and we should prefer completing these before starting new merges queued from distributors. --- .../src/tests/storageserver/mergethrottlertest.cpp | 32 ++++++++++++++++------ .../vespa/storage/storageserver/mergethrottler.cpp | 3 +- .../vespa/storage/storageserver/mergethrottler.h | 22 +++++++++------ 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 5c192942521..e8f8e425af4 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test { void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); void receive_chained_merge_with_full_queue(bool disable_queue_limits); + + std::shared_ptr peek_throttler_queue_top(size_t throttler_idx) { + auto& queue = _throttlers[throttler_idx]->getMergeQueue(); + assert(!queue.empty()); + auto merge = std::dynamic_pointer_cast(queue.begin()->_msg); + assert(merge); + return merge; + } }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + auto nextMerge = peek_throttler_queue_top(0); auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); ASSERT_EQ(static_cast(*fwd).getBucketId(), - static_cast(*nextMerge).getBucketId()); + nextMerge->getBucketId()); - ASSERT_TRUE(static_cast(*fwd).getNodes() - == static_cast(*nextMerge).getNodes()); + ASSERT_TRUE(static_cast(*fwd).getNodes() == nextMerge->getNodes()); // Ensure forwarded merge has a higher priority than the next queued one - EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority()); + EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority()); EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue()); } @@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { waitUntilMergeQueueIs(throttler, 4, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = throttler.getMergeQueue().begin()->_msg; - - ASSERT_EQ(BucketId(32, 0x1337), - dynamic_cast(*nextMerge).getBucketId()); + auto nextMerge = peek_throttler_queue_top(1); + ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId()); auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch EXPECT_EQ(ReturnCode::BUSY, static_cast(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); + + auto highest_pri_merge = peek_throttler_queue_top(1); + EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector nodes; nodes.push_back(1); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index a17c77f6ca4..05e50492206 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -395,7 +395,8 @@ MergeThrottler::enqueueMerge( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - _queue.emplace(msg, _queueSequence++); + const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty(); + _queue.emplace(msg, _queueSequence++, is_forwarded_merge); _metrics->queueSize.set(_queue.size()); } diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 997477a4b70..da301172a3a 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -80,21 +80,25 @@ private: MessageType _msg; metrics::MetricTimer _startTimer; uint64_t _sequence; - - StablePriorityOrderingWrapper(const MessageType& msg, uint64_t sequence) - : _msg(msg), _startTimer(), _sequence(sequence) + bool _is_forwarded_merge; + + StablePriorityOrderingWrapper(const MessageType& msg, + uint64_t sequence, + bool is_forwarded_merge) noexcept + : _msg(msg), + _startTimer(), + _sequence(sequence), + _is_forwarded_merge(is_forwarded_merge) { } - bool operator==(const StablePriorityOrderingWrapper& other) const { - return (*_msg == *other._msg - && _sequence == other._sequence); - } - - bool operator<(const StablePriorityOrderingWrapper& other) const { + bool operator<(const StablePriorityOrderingWrapper& other) const noexcept { if (_msg->getPriority() < other._msg->getPriority()) { return true; } + if (_is_forwarded_merge != other._is_forwarded_merge) { + return _is_forwarded_merge; // Forwarded merges sort before non-forwarded merges. + } return (_sequence < other._sequence); } }; -- cgit v1.2.3