summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp58
1 files changed, 44 insertions, 14 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index e8f8e425af4..0f844ab6b4f 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -52,15 +52,18 @@ struct MergeBuilder {
~MergeBuilder();
MergeBuilder& nodes(uint16_t n0) {
+ _nodes.clear();
_nodes.push_back(n0);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
return *this;
}
MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) {
+ _nodes.clear();
_nodes.push_back(n0);
_nodes.push_back(n1);
_nodes.push_back(n2);
@@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test {
api::ReturnCode::Result expectedResultCode);
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void receive_chained_merge_with_full_queue(bool disable_queue_limits);
+ void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
+ void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
auto& queue = _throttlers[throttler_idx]->getMergeQueue();
@@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
}
void
-MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits)
+MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
@@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
// Send down another merge with non-empty chain. It should _not_ be busy bounced
// (if limits disabled) as it has already been accepted into another node's merge window.
{
- std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
- cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ if (!unordered_fwd) {
+ cmd->setChain(std::vector<uint16_t>({0})); // Forwarded from node 0
+ } else {
+ cmd->setChain(std::vector<uint16_t>({2})); // Forwarded from node 2, i.e. _not_ the lowest index
+ }
+ cmd->set_use_unordered_forwarding(unordered_fwd);
_topLinks[1]->sendDown(cmd);
}
}
@@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa
EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge
}
+TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) {
+ // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too.
+ ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true));
+
+ // Unordered merge is immediately forwarded to the next node
+ _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ auto fwd = std::dynamic_pointer_cast<api::MergeBucketCommand>(
+ _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET));
+ ASSERT_TRUE(fwd);
+ EXPECT_TRUE(fwd->use_unordered_forwarding());
+ EXPECT_EQ(fwd->getChain(), std::vector<uint16_t>({2, 1}));
+}
+
+TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full)
+{
+ fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely
+ {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {0}});
+ auto cmd = std::make_shared<MergeBucketCommand>(
+ makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
+ cmd->set_use_unordered_forwarding(true);
+ _topLinks[1]->sendDown(cmd);
+ }
+ waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window
+}
+
TEST_F(MergeThrottlerTest, broken_cycle) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({1, 0, 2});
{
std::vector<uint16_t> chain;
chain.push_back(0);
@@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
// Send cycled merge which will be executed
{
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
- chain.push_back(2);
+ std::vector<uint16_t> chain({0, 1, 2});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain);
_topLinks[1]->sendDown(cmd);
@@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure)
void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) {
size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount();
for (size_t i = 0; i < max_pending + queued_count; ++i) {
- _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create());
+ _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i))
+ .nodes(throttler_index, throttler_index + 1)
+ .create());
}
-
// Wait till we have max_pending merge forwards and queued_count enqueued.
_topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime);