aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp32
1 files changed, 23 insertions, 9 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 5c192942521..e8f8e425af4 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -147,6 +147,14 @@ struct MergeThrottlerTest : Test {
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+
+ std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
+ auto& queue = _throttlers[throttler_idx]->getMergeQueue();
+ assert(!queue.empty());
+ auto merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(queue.begin()->_msg);
+ assert(merge);
+ return merge;
+ }
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -789,7 +797,7 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
waitUntilMergeQueueIs(*_throttlers[0], 3, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = _throttlers[0]->getMergeQueue().begin()->_msg;
+ auto nextMerge = peek_throttler_queue_top(0);
auto fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -818,13 +826,12 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
fwd = _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
ASSERT_EQ(static_cast<const MergeBucketCommand&>(*fwd).getBucketId(),
- static_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ nextMerge->getBucketId());
- ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes()
- == static_cast<const MergeBucketCommand&>(*nextMerge).getNodes());
+ ASSERT_TRUE(static_cast<const MergeBucketCommand&>(*fwd).getNodes() == nextMerge->getNodes());
// Ensure forwarded merge has a higher priority than the next queued one
- EXPECT_LT(fwd->getPriority(), _throttlers[0]->getMergeQueue().begin()->_msg->getPriority());
+ EXPECT_LT(fwd->getPriority(), peek_throttler_queue_top(0)->getPriority());
EXPECT_EQ(uint64_t(1), _throttlers[0]->getMetrics().chaining.ok.getValue());
}
@@ -863,10 +870,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
waitUntilMergeQueueIs(throttler, 4, _messageWaitTime);
// Merge queue state should not be touched by worker thread now
- auto nextMerge = throttler.getMergeQueue().begin()->_msg;
-
- ASSERT_EQ(BucketId(32, 0x1337),
- dynamic_cast<const MergeBucketCommand&>(*nextMerge).getBucketId());
+ auto nextMerge = peek_throttler_queue_top(1);
+ ASSERT_EQ(BucketId(32, 0x1337), nextMerge->getBucketId());
auto fwd = topLink.getAndRemoveMessage(MessageType::MERGEBUCKET);
@@ -1235,6 +1240,15 @@ TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_ch
EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disabled) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
+ waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
+
+ auto highest_pri_merge = peek_throttler_queue_top(1);
+ EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
std::vector<MergeBucketCommand::Node> nodes;
nodes.push_back(1);