diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-25 19:42:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-25 19:42:41 +0200 |
commit | 37d050d026ae1cdbcb1ba2dd4848107fa8d6fd61 (patch) | |
tree | 3cb25d04708ac3c2158c17ed0117bf883a8caf9d /storage/src/tests/storageserver/mergethrottlertest.cpp | |
parent | cb6a8b7ccc971f4264fd3e0cb1917ba94e75552f (diff) | |
parent | 5b943fc1a120a2548cad10568fbf7e8819cbad3e (diff) |
Merge pull request #19725 from vespa-engine/vekterli/prioritize-forwarded-merges-in-throttler-queuev7.489.25
Prioritize forwarded merges in MergeThrottler queue
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 5c192942521..e8f8e425af4 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test { void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); void receive_chained_merge_with_full_queue(bool disable_queue_limits); + + std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { + auto& queue = _throttlers[throttler_idx]->getMergeQueue(); + assert(!queue.empty()); + auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg); + assert(merge); + return merge; + } }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + auto nextMerge = peek_throttler_queue_top(0); auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(), - static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + nextMerge->getBucketId()); - ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() - == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes()); + ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes()); // Ensure forwarded merge has a higher priority than the next queued one - EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority()); + EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority()); EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue()); } @@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { waitUntilMergeQueueIs(throttler, 4, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = throttler.getMergeQueue().begin()->_msg; - - ASSERT_EQ(BucketId(32, 0x1337), - dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + auto nextMerge = peek_throttler_queue_top(1); + ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId()); auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); + + auto highest_pri_merge = peek_throttler_queue_top(1); + EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector<MergeBucketCommand::Node> nodes; nodes.push_back(1); |