summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/mergethrottlertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-10-06 12:53:40 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-10-06 12:53:40 +0000
commit012fb653f0537829fbf91e83977041a227a43dd2 (patch)
tree86ad3439baab4df309671174c008c9824625c832 /storage/src/tests/storageserver/mergethrottlertest.cpp
parent3576f3575781abbcc601ac8f5f2bcea2571bfa84 (diff)
Minor MergeThrottler code cleanups. No functional changes.
Diffstat (limited to 'storage/src/tests/storageserver/mergethrottlertest.cpp')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp180
1 files changed, 51 insertions, 129 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index fdf13a4cf14..4d6d461ca29 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -180,12 +180,12 @@ MergeThrottlerTest::SetUp()
void
MergeThrottlerTest::TearDown()
{
- for (std::size_t i = 0; i < _topLinks.size(); ++i) {
- if (_topLinks[i]->getState() == StorageLink::OPENED) {
- _topLinks[i]->close();
- _topLinks[i]->flush();
+ for (auto& link : _topLinks) {
+ if (link->getState() == StorageLink::OPENED) {
+ link->close();
+ link->flush();
}
- _topLinks[i] = std::shared_ptr<DummyStorageLink>();
+ link.reset();
}
_topLinks.clear();
_bottomLinks.clear();
@@ -203,18 +203,18 @@ checkChain(const StorageMessage::SP& msg,
const MergeBucketCommand& cmd =
dynamic_cast<const MergeBucketCommand&>(*msg);
- if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) {
+ if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
-void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
+void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
{
const auto start = std::chrono::steady_clock::now();
while (true) {
- std::size_t count;
+ size_t count;
{
std::lock_guard lock(throttler.getStateLock());
count = throttler.getMergeQueue().size();
@@ -405,10 +405,7 @@ TEST_F(MergeThrottlerTest, chain) {
TEST_F(MergeThrottlerTest, with_source_only_node) {
BucketId bid(14, 0x1337);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2);
- nodes.push_back(MergeBucketCommand::Node(1, true));
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
@@ -452,10 +449,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) {
TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
@@ -490,10 +484,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownership) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
@@ -539,13 +530,8 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi
TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(2);
- nodes.push_back(1);
- nodes.push_back(0);
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
+ std::vector<uint16_t> chain({0, 1});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
// Send to last node, which is not the lowest index
@@ -591,10 +577,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
TEST_F(MergeThrottlerTest, resend_handling) {
BucketId bid(32, 0xbadbed);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
@@ -641,13 +624,10 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
- for (std::size_t i = 0; i < maxPending; ++i) {
+ for (size_t i = 0; i < maxPending; ++i) {
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100);
@@ -662,8 +642,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
int priorities[4] = { 200, 150, 120, 240 };
int sortedPris[4] = { 120, 150, 200, 240 };
for (int i = 0; i < 4; ++i) {
- std::shared_ptr<MergeBucketCommand> cmd(
- new MergeBucketCommand(makeDocumentBucket(BucketId(32, i)), nodes, 1234));
+ auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(BucketId(32, i)), nodes, 1234);
cmd->setPriority(priorities[i]);
_topLinks[0]->sendDown(cmd);
}
@@ -671,7 +650,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime);
// Remove all but 4 forwarded merges
- for (std::size_t i = 0; i < maxPending - 4; ++i) {
+ for (size_t i = 0; i < maxPending - 4; ++i) {
_topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
}
ASSERT_EQ(0, _topLinks[0]->getNumCommands());
@@ -702,11 +681,8 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 1; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2 + i);
- nodes.push_back(5 + i);
+ for (size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100 - i);
@@ -719,19 +695,14 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Add a merge for the same bucket twice to the queue
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(12);
- nodes.push_back(123);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {12}, {123}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234);
_topLinks[0]->sendDown(cmd);
}
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(124); // Different node set doesn't matter
- nodes.push_back(14);
+ // Different node set doesn't matter
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {124}, {14}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234);
_topLinks[0]->sendDown(cmd);
@@ -783,10 +754,7 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// This is not a scenario that should ever actually happen, but for
// the sake of robustness, include it anyway.
TEST_F(MergeThrottlerTest, invalid_receiver_node) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(5);
- nodes.push_back(9);
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {5}, {9}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baaaa)), nodes, 1234);
@@ -808,11 +776,8 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2 + i);
- nodes.push_back(5 + i);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100 - i);
@@ -872,11 +837,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
// Fill up all active merges and then 3 queued ones
size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(5 + i);
- nodes.push_back(7 + i);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
cmd->setPriority(250 - i + 5);
@@ -890,11 +852,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
// Sneak in a higher priority message that is bound to be executed
// on the given node
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- std::vector<uint16_t> chain;
- chain.push_back(0);
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {0}});
+ std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0x1337)), nodes, 1234, 1, chain);
cmd->setPriority(0);
@@ -946,13 +905,10 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -988,14 +944,8 @@ TEST_F(MergeThrottlerTest, flush) {
// properly, it will attempt to forward this node again with a bogus
// index. This should be implicitly handled by checking for a full node
TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(5);
- nodes.push_back(9);
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(5);
- chain.push_back(9);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {5}, {9}});
+ std::vector<uint16_t> chain({0, 5, 9});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain);
@@ -1037,11 +987,8 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
ids.push_back(cmd->getMsgId());
@@ -1054,10 +1001,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
// Send down merge with newer system state
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0x12345678)), nodes, 1234, 2);
ids.push_back(cmd->getMsgId());
@@ -1085,11 +1029,8 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 2);
ids.push_back(cmd->getMsgId());
@@ -1126,11 +1067,8 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 1; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -1150,10 +1088,7 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Send down a merge with a cluster state version of 0, which should
// be ignored and queued as usual
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xbaaadbed)), nodes, 1234, 0);
_topLinks[0]->sendDown(cmd);
@@ -1175,10 +1110,7 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival
// Send down a merge with a cluster state version of 9, which should
// be rejected
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 9);
_topLinks[0]->sendDown(cmd);
@@ -1200,12 +1132,8 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival
TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
BucketId bid(32, 0xbadbed);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
- std::vector<uint16_t> chain;
- chain.push_back(0);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
@@ -1223,11 +1151,8 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + maxQueue; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + maxQueue; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1);
@@ -1243,10 +1168,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
_topLinks[0]->getRepliesOnce();
// Send down another merge which should be immediately busy-returned
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -1271,7 +1193,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
- for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) {
+ for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
auto cmd = std::make_shared<MergeBucketCommand>(