diff options
5 files changed, 102 insertions, 207 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java index e54a078cbe6..fea36eaeb15 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/StorageGroup.java @@ -315,12 +315,16 @@ public class StorageGroup { private StorageNode buildSingleNode(DeployState deployState, ContentCluster parent) { int distributionKey = 0; - StorageNode sNode = new StorageNode(deployState.getProperties(), parent.getStorageCluster(), 1.0, distributionKey , false); - sNode.setHostResource(parent.hostSystem().getHost(Container.SINGLENODE_CONTAINER_SERVICESPEC)); - sNode.initService(deployLogger); - PersistenceEngine provider = parent.getPersistence().create(deployState, sNode, storageGroup, null); - new Distributor(deployState.getProperties(), parent.getDistributorNodes(), distributionKey, null, provider); - return sNode; + + StorageNode searchNode = new StorageNode(deployState.getProperties(), parent.getStorageCluster(), 1.0, distributionKey , false); + searchNode.setHostResource(parent.hostSystem().getHost(Container.SINGLENODE_CONTAINER_SERVICESPEC)); + PersistenceEngine provider = parent.getPersistence().create(deployState, searchNode, storageGroup, null); + searchNode.initService(deployLogger); + + Distributor distributor = new Distributor(deployState.getProperties(), parent.getDistributorNodes(), distributionKey, null, provider); + distributor.setHostResource(searchNode.getHostResource()); + distributor.initService(deployLogger); + return searchNode; } /** diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerTest.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerTest.java index 3f211a595b9..a84f73dfd5a 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/metricsproxy/MetricsProxyContainerTest.java @@ -37,7 +37,7 @@ public class MetricsProxyContainerTest { var tester = new VespaModelTester(); tester.addHosts(numberOfHosts); - VespaModel model = tester.createModel(servicesWithManyNodes(), true); + VespaModel model = tester.createModel(hostedServicesWithManyNodes(), true); assertThat(model.getRoot().hostSystem().getHosts().size(), is(numberOfHosts)); for (var host : model.hostSystem().getHosts()) { @@ -56,7 +56,7 @@ public class MetricsProxyContainerTest { var tester = new VespaModelTester(); tester.addHosts(numberOfHosts); - VespaModel model = tester.createModel(servicesWithManyNodes(), true); + VespaModel model = tester.createModel(hostedServicesWithManyNodes(), true); assertThat(model.getRoot().hostSystem().getHosts().size(), is(numberOfHosts)); for (var host : model.hostSystem().getHosts()) { @@ -71,7 +71,7 @@ public class MetricsProxyContainerTest { @Test public void http_server_is_running_on_expected_port() { - VespaModel model = getModel(servicesWithContent(), self_hosted); + VespaModel model = getModel(hostedServicesWithContent(), self_hosted); MetricsProxyContainer container = (MetricsProxyContainer)model.id2producer().get(CONTAINER_CONFIG_ID); assertEquals(19092, container.getSearchPort()); assertEquals(19092, container.getHealthPort()); @@ -82,7 +82,7 @@ public class MetricsProxyContainerTest { @Test public void metrics_rpc_server_is_running_on_expected_port() { - VespaModel model = getModel(servicesWithContent(), self_hosted); + VespaModel model = getModel(hostedServicesWithContent(), self_hosted); MetricsProxyContainer container = (MetricsProxyContainer)model.id2producer().get(CONTAINER_CONFIG_ID); int offset = 3; @@ -96,7 +96,7 @@ public class MetricsProxyContainerTest { @Test public void admin_rpc_server_is_running() { - VespaModel model = getModel(servicesWithContent(), self_hosted); + VespaModel model = getModel(hostedServicesWithContent(), self_hosted); MetricsProxyContainer container = (MetricsProxyContainer)model.id2producer().get(CONTAINER_CONFIG_ID); int offset = 2; @@ -107,7 +107,7 @@ public class MetricsProxyContainerTest { @Test public void preload_is_empty() { - VespaModel model = getModel(servicesWithContent(), self_hosted); + VespaModel model = getModel(hostedServicesWithContent(), self_hosted); MetricsProxyContainer container = (MetricsProxyContainer)model.id2producer().get(CONTAINER_CONFIG_ID); assertEquals("", container.getPreLoad()); @@ -115,7 +115,7 @@ public class MetricsProxyContainerTest { @Test public void hosted_application_propagates_node_dimensions() { - String services = servicesWithContent(); + String services = hostedServicesWithContent(); VespaModel hostedModel = getModel(services, hosted); assertEquals(4, hostedModel.getHosts().size()); String configId = containerConfigId(hostedModel, hosted); @@ -127,7 +127,7 @@ public class MetricsProxyContainerTest { @Test public void metrics_v2_handler_is_set_up_with_node_info_config() { - String services = servicesWithContent(); + String services = hostedServicesWithContent(); VespaModel hostedModel = getModel(services, hosted); var container = (MetricsProxyContainer)hostedModel.id2producer().get(containerConfigId(hostedModel, hosted)); @@ -143,8 +143,8 @@ public class MetricsProxyContainerTest { @Test public void vespa_services_config_has_all_services() { - VespaServicesConfig vespaServicesConfig = getVespaServicesConfig(servicesWithContent()); - assertEquals(7, vespaServicesConfig.service().size()); + VespaServicesConfig vespaServicesConfig = getVespaServicesConfig(hostedServicesWithContent()); + assertEquals(8, vespaServicesConfig.service().size()); for (var service : vespaServicesConfig.service()) { if (service.configId().equals("admin/cluster-controllers/0")) { @@ -158,7 +158,7 @@ public class MetricsProxyContainerTest { @Test public void vespa_services_config_has_service_dimensions() { - VespaServicesConfig vespaServicesConfig = getVespaServicesConfig(servicesWithContent()); + VespaServicesConfig vespaServicesConfig = getVespaServicesConfig(hostedServicesWithContent()); for (var service : vespaServicesConfig.service()) { if (service.configId().equals("admin/cluster-controllers/0")) { assertEquals(1, service.dimension().size()); @@ -169,7 +169,7 @@ public class MetricsProxyContainerTest { } - private static String servicesWithManyNodes() { + private static String hostedServicesWithManyNodes() { return String.join("\n", "<services>", " <container version='1.0' id='foo'>", @@ -182,12 +182,9 @@ public class MetricsProxyContainerTest { "</services>"); } - private static String servicesWithContent() { + private static String hostedServicesWithContent() { return String.join("\n", "<services>", - " <admin version='2.0'>", - " <adminserver hostalias='node1'/>", - " </admin>", " <content version='1.0' id='my-content'>", " <documents />", " <nodes count='1' />", diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index fdf13a4cf14..4d6d461ca29 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -180,12 +180,12 @@ MergeThrottlerTest::SetUp() void MergeThrottlerTest::TearDown() { - for (std::size_t i = 0; i < _topLinks.size(); ++i) { - if (_topLinks[i]->getState() == StorageLink::OPENED) { - _topLinks[i]->close(); - _topLinks[i]->flush(); + for (auto& link : _topLinks) { + if (link->getState() == StorageLink::OPENED) { + link->close(); + link->flush(); } - _topLinks[i] = std::shared_ptr<DummyStorageLink>(); + link.reset(); } _topLinks.clear(); _bottomLinks.clear(); @@ -203,18 +203,18 @@ checkChain(const StorageMessage::SP& msg, const MergeBucketCommand& cmd = dynamic_cast<const MergeBucketCommand&>(*msg); - if (cmd.getChain().size() != static_cast<std::size_t>(std::distance(first, end))) { + if (cmd.getChain().size() != static_cast<size_t>(std::distance(first, end))) { return false; } return std::equal(cmd.getChain().begin(), cmd.getChain().end(), first); } -void waitUntilMergeQueueIs(MergeThrottler& throttler, std::size_t sz, int timeout) +void waitUntilMergeQueueIs(MergeThrottler& throttler, size_t sz, int timeout) { const auto start = std::chrono::steady_clock::now(); while (true) { - std::size_t count; + size_t count; { std::lock_guard lock(throttler.getStateLock()); count = throttler.getMergeQueue().size(); @@ -405,10 +405,7 @@ TEST_F(MergeThrottlerTest, chain) { TEST_F(MergeThrottlerTest, with_source_only_node) { BucketId bid(14, 0x1337); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2); - nodes.push_back(MergeBucketCommand::Node(1, true)); + std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); @@ -452,10 +449,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) { TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index @@ -490,10 +484,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownership) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index @@ -539,13 +530,8 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { BucketId bid(32, 0xfeef00); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(2); - nodes.push_back(1); - nodes.push_back(0); - std::vector<uint16_t> chain; - chain.push_back(0); - chain.push_back(1); + std::vector<MergeBucketCommand::Node> nodes({{2}, {1}, {0}}); + std::vector<uint16_t> chain({0, 1}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); // Send to last node, which is not the lowest index @@ -591,10 +577,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { TEST_F(MergeThrottlerTest, resend_handling) { BucketId bid(32, 0xbadbed); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); @@ -641,13 +624,10 @@ TEST_F(MergeThrottlerTest, resend_handling) { TEST_F(MergeThrottlerTest, priority_queuing) { // Fill up all active merges - std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); ASSERT_GE(maxPending, 4u); - for (std::size_t i = 0; i < maxPending; ++i) { + for (size_t i = 0; i < maxPending; ++i) { auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100); @@ -662,8 +642,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { int priorities[4] = { 200, 150, 120, 240 }; int sortedPris[4] = { 120, 150, 200, 240 }; for (int i = 0; i < 4; ++i) { - std::shared_ptr<MergeBucketCommand> cmd( - new MergeBucketCommand(makeDocumentBucket(BucketId(32, i)), nodes, 1234)); + auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(BucketId(32, i)), nodes, 1234); cmd->setPriority(priorities[i]); _topLinks[0]->sendDown(cmd); } @@ -671,7 +650,7 @@ TEST_F(MergeThrottlerTest, priority_queuing) { waitUntilMergeQueueIs(*_throttlers[0], 4, _messageWaitTime); // Remove all but 4 forwarded merges - for (std::size_t i = 0; i < maxPending - 4; ++i) { + for (size_t i = 0; i < maxPending - 4; ++i) { _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); } ASSERT_EQ(0, _topLinks[0]->getNumCommands()); @@ -702,11 +681,8 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Fill up all active merges and 1 queued one size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 1; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2 + i); - nodes.push_back(5 + i); + for (size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100 - i); @@ -719,19 +695,14 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // Add a merge for the same bucket twice to the queue { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(12); - nodes.push_back(123); + std::vector<MergeBucketCommand::Node> nodes({{0}, {12}, {123}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234); _topLinks[0]->sendDown(cmd); } { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(124); // Different node set doesn't matter - nodes.push_back(14); + // Different node set doesn't matter + std::vector<MergeBucketCommand::Node> nodes({{0}, {124}, {14}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000feee)), nodes, 1234); _topLinks[0]->sendDown(cmd); @@ -783,10 +754,7 @@ TEST_F(MergeThrottlerTest, command_in_queue_duplicate_of_known_merge) { // This is not a scenario that should ever actually happen, but for // the sake of robustness, include it anyway. TEST_F(MergeThrottlerTest, invalid_receiver_node) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(5); - nodes.push_back(9); + std::vector<MergeBucketCommand::Node> nodes({{1}, {5}, {9}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baaaa)), nodes, 1234); @@ -808,11 +776,8 @@ TEST_F(MergeThrottlerTest, forward_queued_merge) { // Fill up all active merges and then 3 queued ones size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(2 + i); - nodes.push_back(5 + i); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {uint16_t(2 + i)}, {uint16_t(5 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234); cmd->setPriority(100 - i); @@ -872,11 +837,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { // Fill up all active merges and then 3 queued ones size_t maxPending = throttler.getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(5 + i); - nodes.push_back(7 + i); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{1}, {uint16_t(5 + i)}, {uint16_t(7 + i)}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); cmd->setPriority(250 - i + 5); @@ -890,11 +852,8 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { // Sneak in a higher priority message that is bound to be executed // on the given node { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(1); - nodes.push_back(0); - std::vector<uint16_t> chain; - chain.push_back(0); + std::vector<MergeBucketCommand::Node> nodes({{1}, {0}}); + std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0x1337)), nodes, 1234, 1, chain); cmd->setPriority(0); @@ -946,13 +905,10 @@ TEST_F(MergeThrottlerTest, execute_queued_merge) { TEST_F(MergeThrottlerTest, flush) { // Fill up all active merges and then 3 queued ones - std::size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); + size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -988,14 +944,8 @@ TEST_F(MergeThrottlerTest, flush) { // properly, it will attempt to forward this node again with a bogus // index. This should be implicitly handled by checking for a full node TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(5); - nodes.push_back(9); - std::vector<uint16_t> chain; - chain.push_back(0); - chain.push_back(5); - chain.push_back(9); + std::vector<MergeBucketCommand::Node> nodes({{0}, {5}, {9}}); + std::vector<uint16_t> chain({0, 5, 9}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain); @@ -1037,11 +987,8 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); ids.push_back(cmd->getMsgId()); @@ -1054,10 +1001,7 @@ TEST_F(MergeThrottlerTest, merge_with_newer_cluster_state_flushes_outdated_queue // Send down merge with newer system state { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0x12345678)), nodes, 1234, 2); ids.push_back(cmd->getMsgId()); @@ -1085,11 +1029,8 @@ TEST_F(MergeThrottlerTest, updated_cluster_state_flushes_outdated_queued) { size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); std::vector<api::StorageMessage::Id> ids; - for (std::size_t i = 0; i < maxPending + 3; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 3; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 2); ids.push_back(cmd->getMsgId()); @@ -1126,11 +1067,8 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Fill up all active merges and then 1 queued one size_t maxPending = _throttlers[0]->getThrottlePolicy().getMaxPendingCount(); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + 1; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + 1; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00baa00 + i)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1150,10 +1088,7 @@ TEST_F(MergeThrottlerTest, legacy_42_merges_do_not_trigger_flush) { // Send down a merge with a cluster state version of 0, which should // be ignored and queued as usual { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xbaaadbed)), nodes, 1234, 0); _topLinks[0]->sendDown(cmd); @@ -1175,10 +1110,7 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival // Send down a merge with a cluster state version of 9, which should // be rejected { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 9); _topLinks[0]->sendDown(cmd); @@ -1200,12 +1132,8 @@ TEST_F(MergeThrottlerTest, outdated_cluster_state_merges_are_rejected_on_arrival TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { BucketId bid(32, 0xbadbed); - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); - std::vector<uint16_t> chain; - chain.push_back(0); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); + std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); @@ -1223,11 +1151,8 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist size_t maxQueue = _throttlers[0]->getMaxQueueSize(); ASSERT_EQ(20, maxQueue); ASSERT_LT(maxPending, 100); - for (std::size_t i = 0; i < maxPending + maxQueue; ++i) { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + for (size_t i = 0; i < maxPending + maxQueue; ++i) { + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf00000 + i)), nodes, 1234, 1); @@ -1243,10 +1168,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist _topLinks[0]->getRepliesOnce(); // Send down another merge which should be immediately busy-returned { - std::vector<MergeBucketCommand::Node> nodes; - nodes.push_back(0); - nodes.push_back(1); - nodes.push_back(2); + std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); _topLinks[0]->sendDown(cmd); @@ -1271,7 +1193,7 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); size_t max_pending = _throttlers[1]->getThrottlePolicy().getMaxPendingCount(); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); - for (std::size_t i = 0; i < max_pending + max_enqueued; ++i) { + for (size_t i = 0; i < max_pending + max_enqueued; ++i) { std::vector<MergeBucketCommand::Node> nodes({{1}, {2}, {3}}); // No chain set, i.e. merge command is freshly squeezed from a distributor. auto cmd = std::make_shared<MergeBucketCommand>( diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 06d49b2155b..914b2c58219 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -171,21 +171,6 @@ MergeThrottler::MergeNodeSequence::chainContainsIndex(uint16_t idx) const return false; } -std::string -MergeThrottler::MergeNodeSequence::getSequenceString() const -{ - std::ostringstream oss; - oss << '['; - for (std::size_t i = 0; i < _cmd.getNodes().size(); ++i) { - if (i > 0) { - oss << ", "; - } - oss << _cmd.getNodes()[i].index; - } - oss << ']'; - return oss.str(); -} - MergeThrottler::MergeThrottler( const config::ConfigUri & configUri, StorageComponentRegister& compReg) @@ -194,12 +179,12 @@ MergeThrottler::MergeThrottler( _merges(), _queue(), _maxQueueSize(1024), - _throttlePolicy(new mbus::StaticThrottlePolicy()), + _throttlePolicy(std::make_unique<mbus::StaticThrottlePolicy>()), _queueSequence(0), _messageLock(), _stateLock(), _configFetcher(configUri.getContext()), - _metrics(new Metrics), + _metrics(std::make_unique<Metrics>()), _component(compReg, "mergethrottler"), _thread(), _rendezvous(RENDEZVOUS_NONE), @@ -301,29 +286,26 @@ MergeThrottler::onFlush(bool /*downwards*/) // Abort active merges, queued and up/down pending std::vector<api::StorageMessage::SP> flushable; - ActiveMergeMap::iterator mergeEnd = _merges.end(); - for (ActiveMergeMap::iterator i = _merges.begin(); i != mergeEnd; ++i) { + for (auto& merge : _merges) { // Only generate a reply if the throttler owns the command - if (i->second.getMergeCmd().get()) { - flushable.push_back(i->second.getMergeCmd()); + if (merge.second.getMergeCmd().get()) { + flushable.push_back(merge.second.getMergeCmd()); } else { LOG(debug, "Not generating flush-reply for %s since we don't " - "own the command", i->first.toString().c_str()); + "own the command", merge.first.toString().c_str()); } DummyMbusMessage<mbus::Reply> dummyReply; _throttlePolicy->processReply(dummyReply); } - MergePriorityQueue::iterator queueEnd = _queue.end(); - for (MergePriorityQueue::iterator i = _queue.begin(); i != queueEnd; ++i) { - flushable.push_back(i->_msg); + for (auto& entry : _queue) { + flushable.push_back(entry._msg); } - // Just pass-through everything in the up-queue, since the messages // are either replies or commands _we_ have sent and thus cannot // send a meaningful reply for - for (std::size_t i = 0; i < _messagesUp.size(); ++i) { - msgGuard.sendUp(_messagesUp[i]); + for (auto& msg : _messagesUp) { + msgGuard.sendUp(msg); } std::back_insert_iterator< @@ -331,28 +313,21 @@ MergeThrottler::onFlush(bool /*downwards*/) > inserter(flushable); std::copy(_messagesDown.begin(), _messagesDown.end(), inserter); - for (std::size_t i = 0; i < flushable.size(); ++i) { + for (auto& msg : flushable) { // Down-bound merge may be a reply, in which case we ignore it // since we can't actually do anything with it now - if (flushable[i]->getType() == api::MessageType::MERGEBUCKET) { - std::shared_ptr<api::MergeBucketReply> reply( - std::make_shared<api::MergeBucketReply>( - static_cast<const api::MergeBucketCommand&>( - *flushable[i]))); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, - "Storage node is shutting down")); - LOG(debug, "Aborted merge since we're flushing: %s", - flushable[i]->toString().c_str()); + if (msg->getType() == api::MessageType::MERGEBUCKET) { + auto reply = std::make_shared<api::MergeBucketReply>(static_cast<const api::MergeBucketCommand&>(*msg)); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Storage node is shutting down")); + LOG(debug, "Aborted merge since we're flushing: %s", msg->toString().c_str()); msgGuard.sendUp(reply); } else { - assert(flushable[i]->getType() == api::MessageType::MERGEBUCKET_REPLY); - LOG(debug, "Ignored merge reply since we're flushing: %s", - flushable[i]->toString().c_str()); + assert(msg->getType() == api::MessageType::MERGEBUCKET_REPLY); + LOG(debug, "Ignored merge reply since we're flushing: %s", msg->toString().c_str()); } } - LOG(debug, "Flushed %zu unfinished or pending merge operations", - flushable.size()); + LOG(debug, "Flushed %zu unfinished or pending merge operations", flushable.size()); _merges.clear(); _queue.clear(); @@ -400,8 +375,8 @@ MergeThrottler::getNextQueuedMerge() return api::StorageMessage::SP(); } - MergePriorityQueue::iterator iter = _queue.begin(); - MergePriorityQueue::value_type entry = *iter; + auto iter = _queue.begin(); + auto entry = *iter; entry._startTimer.stop(_metrics->averageQueueWaitingTime); _queue.erase(iter); return entry._msg; @@ -418,7 +393,7 @@ MergeThrottler::enqueueMerge( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - _queue.insert(MergePriorityQueue::value_type(msg, _queueSequence++)); + _queue.emplace(msg, _queueSequence++); _metrics->queueSize.set(_queue.size()); } @@ -452,7 +427,7 @@ MergeThrottler::rejectMergeIfOutdated( { return false; } - std::ostringstream oss; + vespalib::asciistream oss; oss << "Rejected merge due to outdated cluster state; merge has " << "version " << cmd.getClusterStateVersion() << ", storage node has version " @@ -557,10 +532,10 @@ MergeThrottler::attemptProcessNextQueuedMerge( LOG(spam, "Processing queued merge %s", msg->toString().c_str()); processNewMergeCommand(msg, msgGuard); } else { - std::stringstream oss; - oss << "Queued merge " << *msg << " is out of date; it has already " - "been started by someone else since it was queued"; - LOG(debug, "%s", oss.str().c_str()); + vespalib::asciistream oss; + oss << "Queued merge " << msg->toString() << " is out of date; it has already " + "been started by someone else since it was queued"; + LOG(debug, "%s", oss.c_str()); sendReply(dynamic_cast<const api::MergeBucketCommand&>(*msg), api::ReturnCode(api::ReturnCode::BUSY, oss.str()), msgGuard, _metrics->chaining); @@ -570,8 +545,7 @@ MergeThrottler::attemptProcessNextQueuedMerge( if (_queue.empty()) { LOG(spam, "Queue empty - no merges to process"); } else { - LOG(spam, "Merges queued, but throttle policy disallows further " - "merges at this time"); + LOG(spam, "Merges queued, but throttle policy disallows further merges at this time"); } } return false; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index adca4ca6a00..6ab9dff6f71 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -212,8 +212,7 @@ public: mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; // For unit testing only - std::mutex & getMonitor() { return _messageLock; } - std::mutex & getStateLock() { return _stateLock; } + std::mutex& getStateLock() { return _stateLock; } Metrics& getMetrics() { return *_metrics; } std::size_t getMaxQueueSize() const { return _maxQueueSize; } @@ -263,7 +262,6 @@ private: * pairwise compares equally to the vector of sorted node indices */ bool isChainCompleted() const; - std::string getSequenceString() const; }; /** |