summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2021-10-06 12:53:40 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2021-10-06 12:53:40 +0000
commit012fb653f0537829fbf91e83977041a227a43dd2 (patch)
tree86ad3439baab4df309671174c008c9824625c832 /storage
parent3576f3575781abbcc601ac8f5f2bcea2571bfa84 (diff)
Minor MergeThrottler code cleanups. No functional changes.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp180
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp80
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h4
3 files changed, 79 insertions, 185 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index fdf13a4cf14..4d6d461ca29 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -180,12 +180,12 @@ MergeThrottlerTest::SetUp()
void
MergeThrottlerTest::TearDown()
{
- for (std::size_t i = 0; i < _topLinks.size(); ++i) {
- if (_topLinks[i]->getState() == StorageLink::OPENED) {
- _topLinks[i]->close();
- _topLinks[i]->flush();
+ for (auto& link : _topLinks) {
+ if (link->getState() == StorageLink::OPENED) {
+ link->close();
+ link->flush();
}
- _topLinks[i] = std::shared_ptr<DummyStorageLink>();
+ link.reset();
}
_topLinks.clear();
_bottomLinks.clear();
@@ -203,18 +203,18 @@ checkChain(const StorageMessage::SP& msg,
const MergeBucketCommand& cmd =
dynamic_cast<const MergeBucketCommand&>(*msg);
- if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) {
+ if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) {
return false;
}
return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first);
}
-void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout)
+void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout)
{
const auto start = std::chrono::steady_clock::now();
while (true) {
- std::size_t count;
+ size_t count;
{
std::lock_guard lock(throttler.getStateLock());
count = throttler.getMergeQueue().size();
@@ -405,10 +405,7 @@ TEST_F(MergeThrottlerTest, chain) {
TEST_F(MergeThrottlerTest, with_source_only_node) {
BucketId bid(14, 0x1337);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2);
- nodes.push_back(MergeBucketCommand::Node(1, true));
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
@@ -452,10 +449,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) {
TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
@@ -490,10 +484,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownership) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
@@ -539,13 +530,8 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi
TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
BucketId bid(32, 0xfeef00);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(2);
- nodes.push_back(1);
- nodes.push_back(0);
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(1);
+ std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}});
+ std::vector<uint16_t> chain({0, 1});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
// Send to last node, which is not the lowest index
@@ -591,10 +577,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
TEST_F(MergeThrottlerTest, resend_handling) {
BucketId bid(32, 0xbadbed);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
@@ -641,13 +624,10 @@ TEST_F(MergeThrottlerTest, resend_handling) {
TEST_F(MergeThrottlerTest, priority_queuing) {
// Fill up all active merges
- std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
ASSERT_GE(maxPending, 4u);
- for (std::size_t i = 0; i < maxPending; ++i) {
+ for (size_t i = 0; i < maxPending; ++i) {
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100);
@@ -662,8 +642,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
int priorities[4] = { 200, 150, 120, 240 };
int sortedPris[4] = { 120, 150, 200, 240 };
for (int i = 0; i < 4; ++i) {
- std::shared_ptr<MergeBucketCommand> cmd(
- new MergeBucketCommand(makeDocumentBucket(BucketId(32, i)), nodes, 1234));
+ auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(BucketId(32, i)), nodes, 1234);
cmd->setPriority(priorities[i]);
_topLinks[0]->sendDown(cmd);
}
@@ -671,7 +650,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) {
waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime);
// Remove all but 4 forwarded merges
- for (std::size_t i = 0; i < maxPending - 4; ++i) {
+ for (size_t i = 0; i < maxPending - 4; ++i) {
_topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
}
ASSERT_EQ(0, _topLinks[0]->getNumCommands());
@@ -702,11 +681,8 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Fill up all active merges and 1 queued one
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 1; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2 + i);
- nodes.push_back(5 + i);
+ for (size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100 - i);
@@ -719,19 +695,14 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// Add a merge for the same bucket twice to the queue
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(12);
- nodes.push_back(123);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {12}, {123}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234);
_topLinks[0]->sendDown(cmd);
}
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(124); // Different node set doesn't matter
- nodes.push_back(14);
+ // Different node set doesn't matter
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {124}, {14}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234);
_topLinks[0]->sendDown(cmd);
@@ -783,10 +754,7 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) {
// This is not a scenario that should ever actually happen, but for
// the sake of robustness, include it anyway.
TEST_F(MergeThrottlerTest, invalid_receiver_node) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(5);
- nodes.push_back(9);
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {5}, {9}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baaaa)), nodes, 1234);
@@ -808,11 +776,8 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) {
// Fill up all active merges and then 3 queued ones
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(2 + i);
- nodes.push_back(5 + i);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234);
cmd->setPriority(100 - i);
@@ -872,11 +837,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
// Fill up all active merges and then 3 queued ones
size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(5 + i);
- nodes.push_back(7 + i);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
cmd->setPriority(250 - i + 5);
@@ -890,11 +852,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
// Sneak in a higher priority message that is bound to be executed
// on the given node
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(1);
- nodes.push_back(0);
- std::vector<uint16_t> chain;
- chain.push_back(0);
+ std::vector<MergeBucketCommand::Node> nodes({{1}, {0}});
+ std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0x1337)), nodes, 1234, 1, chain);
cmd->setPriority(0);
@@ -946,13 +905,10 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) {
TEST_F(MergeThrottlerTest, flush) {
// Fill up all active merges and then 3 queued ones
- std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
+ size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -988,14 +944,8 @@ TEST_F(MergeThrottlerTest, flush) {
// properly, it will attempt to forward this node again with a bogus
// index. This should be implicitly handled by checking for a full node
TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(5);
- nodes.push_back(9);
- std::vector<uint16_t> chain;
- chain.push_back(0);
- chain.push_back(5);
- chain.push_back(9);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {5}, {9}});
+ std::vector<uint16_t> chain({0, 5, 9});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain);
@@ -1037,11 +987,8 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
ids.push_back(cmd->getMsgId());
@@ -1054,10 +1001,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue
// Send down merge with newer system state
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0x12345678)), nodes, 1234, 2);
ids.push_back(cmd->getMsgId());
@@ -1085,11 +1029,8 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) {
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
std::vector<api::StorageMessage::Id> ids;
- for (std::size_t i = 0; i < maxPending + 3; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 3; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 2);
ids.push_back(cmd->getMsgId());
@@ -1126,11 +1067,8 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Fill up all active merges and then 1 queued one
size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount();
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + 1; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + 1; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -1150,10 +1088,7 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) {
// Send down a merge with a cluster state version of 0, which should
// be ignored and queued as usual
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xbaaadbed)), nodes, 1234, 0);
_topLinks[0]->sendDown(cmd);
@@ -1175,10 +1110,7 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival
// Send down a merge with a cluster state version of 9, which should
// be rejected
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 9);
_topLinks[0]->sendDown(cmd);
@@ -1200,12 +1132,8 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival
TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
BucketId bid(32, 0xbadbed);
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
- std::vector<uint16_t> chain;
- chain.push_back(0);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
+ std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
@@ -1223,11 +1151,8 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
- for (std::size_t i = 0; i < maxPending + maxQueue; ++i) {
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ for (size_t i = 0; i < maxPending + maxQueue; ++i) {
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1);
@@ -1243,10 +1168,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
_topLinks[0]->getRepliesOnce();
// Send down another merge which should be immediately busy-returned
{
- std::vector<MergeBucketCommand::Node> nodes;
- nodes.push_back(0);
- nodes.push_back(1);
- nodes.push_back(2);
+ std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1);
_topLinks[0]->sendDown(cmd);
@@ -1271,7 +1193,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim
_throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount();
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
- for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) {
+ for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
auto cmd = std::make_shared<MergeBucketCommand>(
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 06d49b2155b..914b2c58219 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -171,21 +171,6 @@ MergeThrottler::MergeNodeSequence::chainContainsIndex(uint16_t idx) const
return false;
}
-std::string
-MergeThrottler::MergeNodeSequence::getSequenceString() const
-{
- std::ostringstream oss;
- oss << '[';
- for (std::size_t i = 0; i < _cmd.getNodes().size(); ++i) {
- if (i > 0) {
- oss << ", ";
- }
- oss << _cmd.getNodes()[i].index;
- }
- oss << ']';
- return oss.str();
-}
-
MergeThrottler::MergeThrottler(
const config::ConfigUri & configUri,
StorageComponentRegister& compReg)
@@ -194,12 +179,12 @@ MergeThrottler::MergeThrottler(
_merges(),
_queue(),
_maxQueueSize(1024),
- _throttlePolicy(new mbus::StaticThrottlePolicy()),
+ _throttlePolicy(std::make_unique<mbus::StaticThrottlePolicy>()),
_queueSequence(0),
_messageLock(),
_stateLock(),
_configFetcher(configUri.getContext()),
- _metrics(new Metrics),
+ _metrics(std::make_unique<Metrics>()),
_component(compReg, "mergethrottler"),
_thread(),
_rendezvous(RENDEZVOUS_NONE),
@@ -301,29 +286,26 @@ MergeThrottler::onFlush(bool /*downwards*/)
// Abort active merges, queued and up/down pending
std::vector<api::StorageMessage::SP> flushable;
- ActiveMergeMap::iterator mergeEnd = _merges.end();
- for (ActiveMergeMap::iterator i = _merges.begin(); i != mergeEnd; ++i) {
+ for (auto& merge : _merges) {
// Only generate a reply if the throttler owns the command
- if (i->second.getMergeCmd().get()) {
- flushable.push_back(i->second.getMergeCmd());
+ if (merge.second.getMergeCmd().get()) {
+ flushable.push_back(merge.second.getMergeCmd());
} else {
LOG(debug, "Not generating flush-reply for %s since we don't "
- "own the command", i->first.toString().c_str());
+ "own the command", merge.first.toString().c_str());
}
DummyMbusMessage<mbus::Reply> dummyReply;
_throttlePolicy->processReply(dummyReply);
}
- MergePriorityQueue::iterator queueEnd = _queue.end();
- for (MergePriorityQueue::iterator i = _queue.begin(); i != queueEnd; ++i) {
- flushable.push_back(i->_msg);
+ for (auto& entry : _queue) {
+ flushable.push_back(entry._msg);
}
-
// Just pass-through everything in the up-queue, since the messages
// are either replies or commands _we_ have sent and thus cannot
// send a meaningful reply for
- for (std::size_t i = 0; i < _messagesUp.size(); ++i) {
- msgGuard.sendUp(_messagesUp[i]);
+ for (auto& msg : _messagesUp) {
+ msgGuard.sendUp(msg);
}
std::back_insert_iterator<
@@ -331,28 +313,21 @@ MergeThrottler::onFlush(bool /*downwards*/)
> inserter(flushable);
std::copy(_messagesDown.begin(), _messagesDown.end(), inserter);
- for (std::size_t i = 0; i < flushable.size(); ++i) {
+ for (auto& msg : flushable) {
// Down-bound merge may be a reply, in which case we ignore it
// since we can't actually do anything with it now
- if (flushable[i]->getType() == api::MessageType::MERGEBUCKET) {
- std::shared_ptr<api::MergeBucketReply> reply(
- std::make_shared<api::MergeBucketReply>(
- static_cast<const api::MergeBucketCommand&>(
- *flushable[i])));
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
- "Storage node is shutting down"));
- LOG(debug, "Aborted merge since we're flushing: %s",
- flushable[i]->toString().c_str());
+ if (msg->getType() == api::MessageType::MERGEBUCKET) {
+ auto reply = std::make_shared<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(*msg));
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Storage node is shutting down"));
+ LOG(debug, "Aborted merge since we're flushing: %s", msg->toString().c_str());
msgGuard.sendUp(reply);
} else {
- assert(flushable[i]->getType() == api::MessageType::MERGEBUCKET_REPLY);
- LOG(debug, "Ignored merge reply since we're flushing: %s",
- flushable[i]->toString().c_str());
+ assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY);
+ LOG(debug, "Ignored merge reply since we're flushing: %s", msg->toString().c_str());
}
}
- LOG(debug, "Flushed %zu unfinished or pending merge operations",
- flushable.size());
+ LOG(debug, "Flushed %zu unfinished or pending merge operations", flushable.size());
_merges.clear();
_queue.clear();
@@ -400,8 +375,8 @@ MergeThrottler::getNextQueuedMerge()
return api::StorageMessage::SP();
}
- MergePriorityQueue::iterator iter = _queue.begin();
- MergePriorityQueue::value_type entry = *iter;
+ auto iter = _queue.begin();
+ auto entry = *iter;
entry._startTimer.stop(_metrics->averageQueueWaitingTime);
_queue.erase(iter);
return entry._msg;
@@ -418,7 +393,7 @@ MergeThrottler::enqueueMerge(
if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) {
return;
}
- _queue.insert(MergePriorityQueue::value_type(msg, _queueSequence++));
+ _queue.emplace(msg, _queueSequence++);
_metrics->queueSize.set(_queue.size());
}
@@ -452,7 +427,7 @@ MergeThrottler::rejectMergeIfOutdated(
{
return false;
}
- std::ostringstream oss;
+ vespalib::asciistream oss;
oss << "Rejected merge due to outdated cluster state; merge has "
<< "version " << cmd.getClusterStateVersion()
<< ", storage node has version "
@@ -557,10 +532,10 @@ MergeThrottler::attemptProcessNextQueuedMerge(
LOG(spam, "Processing queued merge %s", msg->toString().c_str());
processNewMergeCommand(msg, msgGuard);
} else {
- std::stringstream oss;
- oss << "Queued merge " << *msg << " is out of date; it has already "
- "been started by someone else since it was queued";
- LOG(debug, "%s", oss.str().c_str());
+ vespalib::asciistream oss;
+ oss << "Queued merge " << msg->toString() << " is out of date; it has already "
+ "been started by someone else since it was queued";
+ LOG(debug, "%s", oss.c_str());
sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg),
api::ReturnCode(api::ReturnCode::BUSY, oss.str()),
msgGuard, _metrics->chaining);
@@ -570,8 +545,7 @@ MergeThrottler::attemptProcessNextQueuedMerge(
if (_queue.empty()) {
LOG(spam, "Queue empty - no merges to process");
} else {
- LOG(spam, "Merges queued, but throttle policy disallows further "
- "merges at this time");
+ LOG(spam, "Merges queued, but throttle policy disallows further merges at this time");
}
}
return false;
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index adca4ca6a00..6ab9dff6f71 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -212,8 +212,7 @@ public:
mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
// For unit testing only
- std::mutex & getMonitor() { return _messageLock; }
- std::mutex & getStateLock() { return _stateLock; }
+ std::mutex& getStateLock() { return _stateLock; }
Metrics& getMetrics() { return *_metrics; }
std::size_t getMaxQueueSize() const { return _maxQueueSize; }
@@ -263,7 +262,6 @@ private:
* pairwise compares equally to the vector of sorted node indices
*/
bool isChainCompleted() const;
- std::string getSequenceString() const;
};
/**