diff options
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r-- | storage/src/tests/storageserver/mergethrottlertest.cpp | 180 |
1 files changed, 51 insertions, 129 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index fdf13a4cf14..4d6d461ca29 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -180,12 +180,12 @@ MergeThrottlerTest::SetUp() void MergeThrottlerTest::TearDown() { - for (std::size_t i = 0; i < _topLinks.size(); ++i) { - if (_topLinks[i]->getState() == StorageLink::OPENED) { - _topLinks[i]->close(); - _topLinks[i]->flush(); + for (auto& link : _topLinks) { + if (link->getState() == StorageLink::OPENED) { + link->close(); + link->flush(); } - _topLinks[i] = std::shared_ptr<DummyStorageLink>(); + link.reset(); } _topLinks.clear(); _bottomLinks.clear(); @@ -203,18 +203,18 @@ checkChain(const StorageMessage::SP& msg, const MergeBucketCommand& cmd = dynamic_cast<const MergeBucketCommand&>(*msg); - if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) { + if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) { return false; } return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); } -void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) +void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout) { const auto start = std::chrono::steady_clock::now(); while (true) { - std::size_t count; + size_t count; { std::lock_guard lock(throttler.getStateLock()); count = throttler.getMergeQueue().size(); @@ -405,10 +405,7 @@ TEST_F(MergeThrottlerTest, chain) { TEST_F(MergeThrottlerTest, with_source_only_node) { BucketId bid(14, 0x1337); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2); - nodes.push_back(MergeBucketCommand::Node(1, true)); + std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); @@ -452,10 +449,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) { TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index @@ -490,10 +484,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownership) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index @@ -539,13 +530,8 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(2); - nodes.push_back(1); - nodes.push_back(0); - std::vector<uint16_t> chain; - chain.push_back(0); - chain.push_back(1); + std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}}); + std::vector<uint16_t> chain({0, 1}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); // Send to last node, which is not the lowest index @@ -591,10 +577,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { TEST_F(MergeThrottlerTest, resend_handling) { BucketId bid(32, 0xbadbed); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); @@ -641,13 +624,10 @@ TEST_F(MergeThrottlerTest, resend_handling) { TEST_F(MergeThrottlerTest, priority_queuing) { // Fill up all active merges - std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); ASSERT_GE(maxPending, 4u); - for (std::size_t i = 0; i < maxPending; ++i) { + for (size_t i = 0; i < maxPending; ++i) { auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100); @@ -662,8 +642,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { int priorities[4] = { 200, 150, 120, 240 }; int sortedPris[4] = { 120, 150, 200, 240 }; for (int i = 0; i < 4; ++i) { - std::shared_ptr<MergeBucketCommand> cmd( - new MergeBucketCommand(makeDocumentBucket(BucketId(32, i)), nodes, 1234)); + auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(BucketId(32, i)), nodes, 1234); cmd->setPriority(priorities[i]); _topLinks[0]->sendDown(cmd); } @@ -671,7 +650,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime); // Remove all but 4 forwarded merges - for (std::size_t i = 0; i < maxPending - 4; ++i) { + for (size_t i = 0; i < maxPending - 4; ++i) { _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); } ASSERT_EQ(0, _topLinks[0]->getNumCommands()); @@ -702,11 +681,8 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Fill up all active merges and 1 queued one size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 1; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2 + i); - nodes.push_back(5 + i); + for (size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100 - i); @@ -719,19 +695,14 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Add a merge for the same bucket twice to the queue { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(12); - nodes.push_back(123); + std::vector<MergeBucketCommand::Node> nodes({{0}, {12}, {123}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234); _topLinks[0]->sendDown(cmd); } { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(124); // Different node set doesn't matter - nodes.push_back(14); + // Different node set doesn't matter + std::vector<MergeBucketCommand::Node> nodes({{0}, {124}, {14}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234); _topLinks[0]->sendDown(cmd); @@ -783,10 +754,7 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // This is not a scenario that should ever actually happen, but for // the sake of robustness, include it anyway. TEST_F(MergeThrottlerTest, invalid_receiver_node) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(5); - nodes.push_back(9); + std::vector<MergeBucketCommand::Node> nodes({{1}, {5}, {9}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baaaa)), nodes, 1234); @@ -808,11 +776,8 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { // Fill up all active merges and then 3 queued ones size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2 + i); - nodes.push_back(5 + i); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100 - i); @@ -872,11 +837,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { // Fill up all active merges and then 3 queued ones size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(5 + i); - nodes.push_back(7 + i); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); cmd->setPriority(250 - i + 5); @@ -890,11 +852,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { // Sneak in a higher priority message that is bound to be executed // on the given node { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(0); - std::vector<uint16_t> chain; - chain.push_back(0); + std::vector<MergeBucketCommand::Node> nodes({{1}, {0}}); + std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0x1337)), nodes, 1234, 1, chain); cmd->setPriority(0); @@ -946,13 +905,10 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { TEST_F(MergeThrottlerTest, flush) { // Fill up all active merges and then 3 queued ones - std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -988,14 +944,8 @@ TEST_F(MergeThrottlerTest, flush) { // properly, it will attempt to forward this node again with a bogus // index. This should be implicitly handled by checking for a full node TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(5); - nodes.push_back(9); - std::vector<uint16_t> chain; - chain.push_back(0); - chain.push_back(5); - chain.push_back(9); + std::vector<MergeBucketCommand::Node> nodes({{0}, {5}, {9}}); + std::vector<uint16_t> chain({0, 5, 9}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain); @@ -1037,11 +987,8 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); ids.push_back(cmd->getMsgId()); @@ -1054,10 +1001,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue // Send down merge with newer system state { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0x12345678)), nodes, 1234, 2); ids.push_back(cmd->getMsgId()); @@ -1085,11 +1029,8 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 2); ids.push_back(cmd->getMsgId()); @@ -1126,11 +1067,8 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Fill up all active merges and then 1 queued one size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 1; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1150,10 +1088,7 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Send down a merge with a cluster state version of 0, which should // be ignored and queued as usual { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xbaaadbed)), nodes, 1234, 0); _topLinks[0]->sendDown(cmd); @@ -1175,10 +1110,7 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival // Send down a merge with a cluster state version of 9, which should // be rejected { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 9); _topLinks[0]->sendDown(cmd); @@ -1200,12 +1132,8 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { BucketId bid(32, 0xbadbed); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); - std::vector<uint16_t> chain; - chain.push_back(0); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); + std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); @@ -1223,11 +1151,8 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + maxQueue; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + maxQueue; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); @@ -1243,10 +1168,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist _topLinks[0]->getRepliesOnce(); // Send down another merge which should be immediately busy-returned { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1271,7 +1193,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); - for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) { + for (size_t i = 0; i < max_pending + max_enqueued; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( |