diff options
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 58 |
1 files changed, 44 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index e8f8e425af4..0f844ab6b4f 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -52,15 +52,18 @@ struct MergeBuilder { ~MergeBuilder(); MergeBuilder& nodes(uint16_t n0) { + _nodes.clear(); _nodes.push_back(n0); return *this; } MergeBuilder& nodes(uint16_t n0, uint16_t n1) { + _nodes.clear(); _nodes.push_back(n0); _nodes.push_back(n1); return *this; } MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) { + _nodes.clear(); _nodes.push_back(n0); _nodes.push_back(n1); _nodes.push_back(n2); @@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test { api::ReturnCode::Result expectedResultCode); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void receive_chained_merge_with_full_queue(bool disable_queue_limits); + void fill_up_throttler_active_window_and_queue(uint16_t node_idx); + void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { auto& queue = _throttlers[throttler_idx]->getMergeQueue(); @@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist } void -MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits) +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); @@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim // Send down another merge with non-empty chain. It should _not_ be busy bounced // (if limits disabled) as it has already been accepted into another node's merge window. { - std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); + std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); - cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0 + if (!unordered_fwd) { + cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0 + } else { + cmd->setChain(std::vector<uint16_t>({2})); // Forwarded from node 2, i.e. _not_ the lowest index + } + cmd->set_use_unordered_forwarding(unordered_fwd); _topLinks[1]->sendDown(cmd); } } @@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge } +TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) { + // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too. + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true)); + + // Unordered merge is immediately forwarded to the next node + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + auto fwd = std::dynamic_pointer_cast<api::MergeBucketCommand>( + _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET)); + ASSERT_TRUE(fwd); + EXPECT_TRUE(fwd->use_unordered_forwarding()); + EXPECT_EQ(fwd->getChain(), std::vector<uint16_t>({2, 1})); +} + +TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full) +{ + fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely + { + std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {0}}); + auto cmd = std::make_shared<MergeBucketCommand>( + makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); + cmd->set_use_unordered_forwarding(true); + _topLinks[1]->sendDown(cmd); + } + waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window +} + TEST_F(MergeThrottlerTest, broken_cycle) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(0); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({1, 0, 2}); { std::vector<uint16_t> chain; chain.push_back(0); @@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) { // Send cycled merge which will be executed { - std::vector<uint16_t> chain; - chain.push_back(0); - chain.push_back(1); - chain.push_back(2); + std::vector<uint16_t> chain({0, 1, 2}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain); _topLinks[1]->sendDown(cmd); @@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure) void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); for (size_t i = 0; i < max_pending + queued_count; ++i) { - _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create()); + _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)) + .nodes(throttler_index, throttler_index + 1) + .create()); } - // Wait till we have max_pending merge forwards and queued_count enqueued. _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime); waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime); |