diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-10-05 17:04:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-05 17:04:33 +0200 |
commit | 3576f3575781abbcc601ac8f5f2bcea2571bfa84 (patch) | |
tree | f769ae8ed18a27faf1c4cf2b556ae8bbbc4a0118 /storage/src/tests/storageserver/mergethrottlertest.cpp | |
parent | 5f5cb7aa87195c92b374dcbc742a5263559c8bb0 (diff) | |
parent | 7c41749950750d271dede3bfa4e1868cbd2fbc6b (diff) |
Merge pull request #19424 from vespa-engine/vekterli/do-not-busy-bounce-merges-forwarded-from-other-nodes
Do not busy-bounce merges forwarded from other nodes
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 48 |
1 files changed, 47 insertions, 1 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index dfeaee031ba..fdf13a4cf14 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -146,6 +146,7 @@ struct MergeThrottlerTest : Test { api::ReturnCode::Result expectedResultCode); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); + void receive_chained_merge_with_full_queue(bool disable_queue_limits); }; MergeThrottlerTest::MergeThrottlerTest() = default; @@ -1217,7 +1218,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { static_cast<MergeBucketReply&>(*reply).getResult().getResult()); } -TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { +TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_distributors) { size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); @@ -1227,6 +1228,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { nodes.push_back(0); nodes.push_back(1); nodes.push_back(2); + // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1262,6 +1264,50 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue) { EXPECT_EQ(1, _throttlers[0]->getMetrics().local.failures.busy.getValue()); } +void +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits) +{ + // Note: uses node with index 1 to not be the first node in chain + _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); + size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); + // No chain set, i.e. merge command is freshly squeezed from a distributor. + auto cmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); + _topLinks[1]->sendDown(cmd); + } + _topLinks[1]->waitForMessages(max_pending, _messageWaitTime); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued, _messageWaitTime); + + // Clear all forwarded merges + _topLinks[1]->getRepliesOnce(); + // Send down another merge with non-empty chain. It should _not_ be busy bounced + // (if limits disabled) as it has already been accepted into another node's merge window. + { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); + auto cmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); + cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0 + _topLinks[1]->sendDown(cmd); + } +} + +TEST_F(MergeThrottlerTest, forwarded_merges_not_busy_bounced_even_if_queue_is_full_if_chained_limits_disabled) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true)); + size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); + waitUntilMergeQueueIs(*_throttlers[1], max_enqueued + 1, _messageWaitTime); +} + +TEST_F(MergeThrottlerTest, forwarded_merges_busy_bounced_if_queue_is_full_and_chained_limits_enforced) { + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(false)); + + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); + auto reply = _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET_REPLY); + EXPECT_EQ(ReturnCode::BUSY, static_cast<MergeBucketReply&>(*reply).getResult().getResult()); +} + TEST_F(MergeThrottlerTest, broken_cycle) { std::vector<MergeBucketCommand::Node> nodes; nodes.push_back(1); |