diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-25 19:42:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 19:42:41 +0200 |
commit | 37d050d026ae1cdbcb1ba2dd4848107fa8d6fd61 (patch) | |
tree | 3cb25d04708ac3c2158c17ed0117bf883a8caf9d | |
parent | cb6a8b7ccc971f4264fd3e0cb1917ba94e75552f (diff) | |
parent | 5b943fc1a120a2548cad10568fbf7e8819cbad3e (diff) |
Merge pull request #19725 from vespa-engine/vekterli/prioritize-forwarded-merges-in-throttler-queuev7.489.25
Prioritize forwarded merges in MergeThrottler queue
3 files changed, 38 insertions, 19 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 5c192942521..e8f8e425af4 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test { void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); void receive_chained_merge_with_full_queue(bool disable_queue_limits); + + std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { + auto& queue = _throttlers[throttler_idx]->getMergeQueue(); + assert(!queue.empty()); + auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg); + assert(merge); + return merge; + } }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + auto nextMerge = peek_throttler_queue_top(0); auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(), - static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + nextMerge->getBucketId()); - ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() - == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes()); + ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes()); // Ensure forwarded merge has a higher priority than the next queued one - EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority()); + EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority()); EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue()); } @@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { waitUntilMergeQueueIs(throttler, 4, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = throttler.getMergeQueue().begin()->_msg; - - ASSERT_EQ(BucketId(32, 0x1337), - dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + auto nextMerge = peek_throttler_queue_top(1); + ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId()); auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); + + auto highest_pri_merge = peek_throttler_queue_top(1); + EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector<MergeBucketCommand::Node> nodes; nodes.push_back(1); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index a17c77f6ca4..05e50492206 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -395,7 +395,8 @@ MergeThrottler::enqueueMerge( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - _queue.emplace(msg, _queueSequence++); + const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty(); + _queue.emplace(msg, _queueSequence++, is_forwarded_merge); _metrics->queueSize.set(_queue.size()); } diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 997477a4b70..da301172a3a 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -80,21 +80,25 @@ private: MessageType _msg; metrics::MetricTimer _startTimer; uint64_t _sequence; - - StablePriorityOrderingWrapper(const MessageType& msg, uint64_t sequence) - : _msg(msg), _startTimer(), _sequence(sequence) + bool _is_forwarded_merge; + + StablePriorityOrderingWrapper(const MessageType& msg, + uint64_t sequence, + bool is_forwarded_merge) noexcept + : _msg(msg), + _startTimer(), + _sequence(sequence), + _is_forwarded_merge(is_forwarded_merge) { } - bool operator==(const StablePriorityOrderingWrapper& other) const { - return (*_msg == *other._msg - && _sequence == other._sequence); - } - - bool operator<(const StablePriorityOrderingWrapper& other) const { + bool operator<(const StablePriorityOrderingWrapper& other) const noexcept { if (_msg->getPriority() < other._msg->getPriority()) { return true; } + if (_is_forwarded_merge != other._is_forwarded_merge) { + return _is_forwarded_merge; // Forwarded merges sort before non-forwarded merges. + } return (_sequence < other._sequence); } }; |