diff options
Diffstat (limited to 'storage/src/tests')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 5c192942521..e8f8e425af4 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test { void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); void receive_chained_merge_with_full_queue(bool disable_queue_limits); + + std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { + auto& queue = _throttlers[throttler_idx]->getMergeQueue(); + assert(!queue.empty()); + auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg); + assert(merge); + return merge; + } }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg; + auto nextMerge = peek_throttler_queue_top(0); auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(), - static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + nextMerge->getBucketId()); - ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() - == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes()); + ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes()); // Ensure forwarded merge has a higher priority than the next queued one - EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority()); + EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority()); EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue()); } @@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { waitUntilMergeQueueIs(throttler, 4, _messageWaitTime); // Merge queue state should not be touched by worker thread now - auto nextMerge = throttler.getMergeQueue().begin()->_msg; - - ASSERT_EQ(BucketId(32, 0x1337), - dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId()); + auto nextMerge = peek_throttler_queue_top(1); + ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId()); auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET); @@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); + + auto highest_pri_merge = peek_throttler_queue_top(1); + EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector<MergeBucketCommand::Node> nodes; nodes.push_back(1); |