aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp21
1 files changed, 19 insertions, 2 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7325840dbb0..fdf13a4cf14 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -146,6 +146,7 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits);
};
MergeThrottlerTest::MergeThrottlerTest() = default;
@@ -1263,8 +1264,11 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
EXPECT_EQ(1, _throttlers[0]->getMetrics().local.failures.busy.getValue());
}
-TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full) {
+void
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+{
// Note: uses node with index 1 to not be the first node in chain
+ _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1280,7 +1284,7 @@ TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_fu
// Clear all forwarded merges
_topLinks[1]->getRepliesOnce();
// Send down another merge with non-empty chain. It should _not_ be busy bounced
- // as it has already been accepted into another node's merge window.
+ // (if limits disabled) as it has already been accepted into another node's merge window.
{
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
@@ -1288,9 +1292,22 @@ TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_fu
cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
_topLinks[1]->sendDown(cmd);
}
+}
+
+TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true));
+ size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime);
}
+TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) {
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false));
+
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
+ auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY);
+ EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult());
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
std::vector<MergeBucketCommand::Node> nodes;
nodes.push_back(1);