diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-11-02 15:33:22 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-02 15:33:22 +0100 |
commit | 756ac9de004347d2d5b8893b43c37eb6ac930560 (patch) | |
tree | f0ce3b5db120b40bfa264593627b010fe7a309cc | |
parent | 875422da13b5236fe368709d66f4696f06fca3f2 (diff) | |
parent | f2983f6f300630f6aec7538d75eed6356fe12da6 (diff) |
Merge pull request #29194 from vespa-engine/vekterli/merge-memory-usage-soft-limiting
Add configurable soft limiting of memory used by merge operations on a content node
20 files changed, 553 insertions, 115 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 129091606b3..4c20c40b406 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -109,7 +109,8 @@ class ProtonServiceLayerProcess : public storage::ServiceLayerProcess { public: ProtonServiceLayerProcess(const config::ConfigUri & configUri, proton::Proton & proton, FNET_Transport& transport, - const vespalib::string& file_distributor_connection_spec); + const vespalib::string& file_distributor_connection_spec, + const vespalib::HwInfo& hw_info); ~ProtonServiceLayerProcess() override { shutdown(); } void shutdown() override; @@ -130,8 +131,9 @@ public: ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri, proton::Proton & proton, FNET_Transport& transport, - const vespalib::string& file_distributor_connection_spec) - : ServiceLayerProcess(configUri), + const vespalib::string& file_distributor_connection_spec, + const vespalib::HwInfo& hw_info) + : ServiceLayerProcess(configUri, hw_info), _proton(proton), _transport(transport), _file_distributor_connection_spec(file_distributor_connection_spec), @@ -259,18 +261,18 @@ App::startAndRun(FNET_Transport & transport, int argc, char **argv) { proton.init(configSnapshot); } vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec; - configSnapshot.reset(); std::unique_ptr<ProtonServiceLayerProcess> spiProton; if ( ! params.serviceidentity.empty()) { spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport, - file_distributor_connection_spec); + file_distributor_connection_spec, configSnapshot->getHwInfo()); spiProton->setupConfig(subscribeTimeout); spiProton->createNode(); EV_STARTED("servicelayer"); } else { proton.getMetricManager().init(identityUri); } + configSnapshot.reset(); EV_STARTED("proton"); while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) { std::this_thread::sleep_for(1000ms); diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp index 9a0a2968c69..808747034ac 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp @@ -243,7 +243,7 @@ public: MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri& configUri, PersistenceProvider& provider, std::unique_ptr<storage::IStorageChainBuilder> chain_builder) - : ServiceLayerProcess(configUri), + : ServiceLayerProcess(configUri, vespalib::HwInfo()), _provider(provider) { if (chain_builder) { diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 0773958e535..6ed05e14519 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -45,6 +45,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { void assert_simple_merge_bucket_command(); void assert_simple_delete_bucket_command(); MergeBucketMetricSet& get_merge_metrics(); + [[nodiscard]] uint32_t merge_footprint(const std::string& db_state); }; std::shared_ptr<MergeOperation> @@ -86,7 +87,7 @@ MergeOperationTest::assert_simple_merge_bucket_command() { ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " - "reasons to start: ) => 0", + "estimated memory footprint: 2 bytes, reasons to start: ) => 0", _sender.getLastCommand(true)); } @@ -295,7 +296,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) { std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " - "reasons to start: ) => 0"); + "estimated memory footprint: 2 bytes, reasons to start: ) => 0"); ASSERT_EQ(merge, _sender.getLastCommand(true)); @@ -356,8 +357,8 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) { std::string merge( "MergeBucketCommand(BucketId(0x4000000000000001), to time " - "10000000, cluster state version: 0, nodes: [0, 2, 1 " - "(source only)], chain: [], reasons to start: ) => 0"); + "10000000, cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], " + "estimated memory footprint: 2 bytes, reasons to start: ) => 0"); ASSERT_EQ(merge, _sender.getLastCommand(true)); sendReply(op); @@ -580,14 +581,14 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " "cluster state version: 0, nodes: [2, 1, 3], chain: [], " - "reasons to start: ) => 1", + "estimated memory footprint: 2 bytes, reasons to start: ) => 1", _sender.getLastCommand(true)); // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2) setup_simple_merge_op({1, 2}); ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, " "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), " - "reasons to start: ) => 2", + "estimated memory footprint: 2 bytes, reasons to start: ) => 2", _sender.getLastCommand(true)); _sender.clear(); @@ -600,7 +601,7 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all setup_simple_merge_op({2, 1}); ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, " "cluster state version: 0, nodes: [2, 1], chain: [], " - "reasons to start: ) => 1", + "estimated memory footprint: 2 bytes, reasons to start: ) => 1", _sender.getLastCommand(true)); } @@ -644,4 +645,60 @@ TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) { EXPECT_FALSE(op->ok()); } +uint32_t MergeOperationTest::merge_footprint(const std::string& db_state) { + getClock().setAbsoluteTimeInSeconds(10); + addNodesToBucketDB(document::BucketId(16, 1), db_state); + enable_cluster_state("distributor:1 storage:3"); + MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2))); + op.setIdealStateManager(&getIdealStateManager()); + + _sender.clear(); + op.start(_sender); + assert(!_sender.commands().empty()); + auto cmd_as_merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]); + assert(cmd_as_merge); + return cmd_as_merge->estimated_memory_footprint(); +} + +TEST_F(MergeOperationTest, memory_footprint_is_computed_from_replica_state) { + // Reminder of syntax: "index=checksum/doc count/doc size" + // {0,2} in sync, {1} out of sync; footprint is sum across "sync-ness groups" + EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=10/100/3000"), 10'000); + EXPECT_EQ(merge_footprint("0=10/100/7000,1=20/200/3000,2=10/100/7000"), 10'000); + // All replicas mutually out of sync + EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=30/100/5000"), 15'000); + // One replica empty + EXPECT_EQ(merge_footprint("0=20/200/4000,1=20/200/4000,2=1/0/0"), 4'000); +} + +TEST_F(MergeOperationTest, memory_footprint_is_bounded_by_max_expected_merge_chunk_size) { + auto cfg = make_config(); + cfg->setSplitSize(20'000); // proxy for max merge chunk size + configure_stripe(cfg); + + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/9999"), 19'999); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10000"), 20'000); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10001"), 20'000); + EXPECT_EQ(merge_footprint("0=10/100/6000,1=20/200/7000,2=30/100/20000"), 20'000); +} + +TEST_F(MergeOperationTest, memory_footprint_with_single_doc_replica_can_be_greater_than_max_expected_bucket_size) { + auto cfg = make_config(); + cfg->setSplitSize(20'000); + configure_stripe(cfg); + + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/1/50000"), 50'000); + EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/1/60000,2=30/1/50000"), 60'000); +} + +TEST_F(MergeOperationTest, memory_footprint_estimation_saturates_instead_of_overflowing_u32_limits) { + auto cfg = make_config(); + cfg->setSplitSize(1'234'567); + configure_stripe(cfg); + // Here we massively _undercount_ our estimate, but this is a wildly unrealistic replica state + // just for testing correctness of arithmetic ops. + // UINT32_MAX/3 * 3 + 1 will overflow to 0 if unchecked. Must be saturated instead. + EXPECT_EQ(merge_footprint("0=10/10/1431655765,1=20/10/1431655765,2=30/10/1431655766"), 1'234'567); +} + } // storage::distributor diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index addc80e4150..698d8dee573 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -79,7 +79,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> { _protocol(_docMan.getTypeRepoSP()) { } - ~StorageProtocolTest(); + ~StorageProtocolTest() override; void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) { reply.setBucketInfo(_dummy_bucket_info); @@ -456,18 +456,12 @@ TEST_P(StorageProtocolTest, delete_bucket) { TEST_P(StorageProtocolTest, merge_bucket) { using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; - nodes.push_back(Node(4, false)); - nodes.push_back(Node(13, true)); - nodes.push_back(Node(26, true)); - - std::vector<uint16_t> chain; - // Not a valid chain wrt. the nodes, but just want to have unique values - chain.push_back(7); - chain.push_back(14); + std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}}; + std::vector<uint16_t> chain = {7, 14}; // Not a valid chain wrt. the nodes, but just want to have unique values auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); cmd->set_use_unordered_forwarding(true); + cmd->set_estimated_memory_footprint(123'456'789); auto cmd2 = copyCommand(cmd); EXPECT_EQ(_bucket, cmd2->getBucket()); EXPECT_EQ(nodes, cmd2->getNodes()); @@ -475,6 +469,7 @@ TEST_P(StorageProtocolTest, merge_bucket) { EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); EXPECT_EQ(chain, cmd2->getChain()); EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); + EXPECT_EQ(cmd2->estimated_memory_footprint(), 123'456'789); auto reply = std::make_shared<MergeBucketReply>(*cmd); auto reply2 = copyReply(reply); @@ -485,6 +480,17 @@ TEST_P(StorageProtocolTest, merge_bucket) { EXPECT_EQ(chain, reply2->getChain()); } +TEST_P(StorageProtocolTest, merge_bucket_estimated_memory_footprint_is_zero_by_default) { + using Node = api::MergeBucketCommand::Node; + std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}}; + std::vector<uint16_t> chain = {7, 14}; + + auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); + cmd->set_use_unordered_forwarding(true); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->estimated_memory_footprint(), 0); +} + TEST_P(StorageProtocolTest, split_bucket) { auto cmd = std::make_shared<SplitBucketCommand>(_bucket); EXPECT_EQ(0u, cmd->getMinSplitBits()); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 7a7f2551c2d..6f80ffe0727 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,17 +1,18 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storage/persistence/messages.h> +#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/size_literals.h> #include <unordered_set> #include <memory> #include <iterator> @@ -30,22 +31,33 @@ namespace storage { namespace { using StorServerConfig = vespa::config::content::core::StorServerConfig; +using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder; vespalib::string _storage("storage"); +std::unique_ptr<StorServerConfig> default_server_config() { + vdstestlib::DirConfig dir_config(getStandardConfig(true)); + auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); + return config_from<StorServerConfig>(cfg_uri); +} + struct MergeBuilder { - document::BucketId _bucket; - api::Timestamp _maxTimestamp; - std::vector<uint16_t> _nodes; - std::vector<uint16_t> _chain; + document::BucketId _bucket; + api::Timestamp _maxTimestamp; + std::vector<uint16_t> _nodes; + std::vector<uint16_t> _chain; std::unordered_set<uint16_t> _source_only; - uint64_t _clusterStateVersion; + uint64_t _clusterStateVersion; + uint32_t _memory_usage; + bool _unordered; - MergeBuilder(const document::BucketId& bucket) + explicit MergeBuilder(const document::BucketId& bucket) : _bucket(bucket), _maxTimestamp(1234), _chain(), - _clusterStateVersion(1) + _clusterStateVersion(1), + _memory_usage(0), + _unordered(false) { nodes(0, 1, 2); } @@ -100,6 +112,14 @@ struct MergeBuilder { _source_only.insert(node); return *this; } + MergeBuilder& memory_usage(uint32_t usage_bytes) { + _memory_usage = usage_bytes; + return *this; + } + MergeBuilder& unordered(bool is_unordered) { + _unordered = is_unordered; + return *this; + } api::MergeBucketCommand::SP create() const { std::vector<api::MergeBucketCommand::Node> n; @@ -112,6 +132,8 @@ struct MergeBuilder { makeDocumentBucket(_bucket), n, _maxTimestamp, _clusterStateVersion, _chain); cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0])); + cmd->set_estimated_memory_footprint(_memory_usage); + cmd->set_use_unordered_forwarding(_unordered); return cmd; } }; @@ -137,20 +159,26 @@ struct MergeThrottlerTest : Test { std::vector<DummyStorageLink*> _bottomLinks; MergeThrottlerTest(); - ~MergeThrottlerTest(); + ~MergeThrottlerTest() override; void SetUp() override; void TearDown() override; + MergeThrottler& throttler(size_t idx) noexcept { + assert(idx < _throttlers.size()); + return *_throttlers[idx]; + } + api::MergeBucketCommand::SP sendMerge(const MergeBuilder&); - void sendAndExpectReply( + void send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode); + std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg); + void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void fill_up_throttler_active_window_and_queue(uint16_t node_idx); void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { @@ -170,20 +198,17 @@ MergeThrottlerTest::~MergeThrottlerTest() = default; void MergeThrottlerTest::SetUp() { - vdstestlib::DirConfig dir_config(getStandardConfig(true)); - auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); - auto config = config_from<StorServerConfig>(cfg_uri); - + auto config = default_server_config(); for (int i = 0; i < _storageNodeCount; ++i) { auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1")); std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister()); + auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); - DummyStorageLink* bottom = new DummyStorageLink; + auto* bottom = new DummyStorageLink; throttler->push_back(std::unique_ptr<StorageLink>(bottom)); _servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release())); @@ -291,6 +316,7 @@ TEST_F(MergeThrottlerTest, chain) { cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); const uint16_t distributorIndex = 123; cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded + cmd->set_estimated_memory_footprint(456'789); StorageMessage::SP fwd = cmd; StorageMessage::SP fwdToExec; @@ -322,10 +348,12 @@ TEST_F(MergeThrottlerTest, chain) { } EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end())); } - // Ensure priority, cluster state version and timeout is correctly forwarded + // Ensure operation properties are forwarded as expected EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); - EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd); + EXPECT_EQ(as_merge.getClusterStateVersion(), 123); + EXPECT_EQ(as_merge.getTimeout(), 54321ms); + EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789); } _topLinks[lastNodeIdx]->sendDown(fwd); @@ -1209,7 +1237,7 @@ void MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain - _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); + _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits); size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { @@ -1350,7 +1378,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) { } void -MergeThrottlerTest::sendAndExpectReply( +MergeThrottlerTest::send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode) @@ -1362,13 +1390,22 @@ MergeThrottlerTest::sendAndExpectReply( ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult()); } +std::shared_ptr<api::StorageMessage> +MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg) +{ + _topLinks[0]->sendDown(msg); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); +} + TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) { document::BucketId bucket(16, 1234); std::vector<api::GetBucketDiffCommand::Node> nodes; auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), nodes, api::Timestamp(1234)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1379,7 +1416,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject std::vector<api::GetBucketDiffCommand::Node> nodes; auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + applyDiffCmd, api::MessageType::APPLYBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1411,7 +1449,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges) auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); } @@ -1428,7 +1467,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(), + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(bucket).create(), api::MessageType::MERGEBUCKET_REPLY, api::ReturnCode::BUSY)); @@ -1480,6 +1520,159 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) { EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) { + ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself + + throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi); + + ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0); + + std::shared_ptr<api::StorageMessage> fwd_cmd; + ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered + // merge and can therefore NOT be enqueued; it must be bounced immediately. + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(document::BucketId(16, 1)) + .nodes(2, 1, 0).chain(2, 1).unordered(true) + .memory_usage(8_Mi).create(), + MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY)); + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged + + // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in. + auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply(); + fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine")); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + std::shared_ptr<api::StorageReply>(std::move(fwd_reply)), + MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge + + ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0); + + // New merge is accepted and forwarded + ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create())); + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi); +} + +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) { + throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent directly from a distributor and + // can therefore be enqueued. + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create()); + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged +} + +TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) { + throttler(0).set_max_merge_memory_usage_bytes_locking(5_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create()); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi); +} + +TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { + // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing + // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we + // send for below in the test code. + ASSERT_LT(throttler_max_merges_pending(0), 1000); + + throttler(0).set_max_merge_memory_usage_bytes_locking(50_Mi); + // Fill up active window on node 0. Note: these merges do not have any associated memory cost. + fill_throttler_queue_with_n_commands(0, 0); + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create()); + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); +} + +namespace { + +vespalib::HwInfo make_mem_info(uint64_t mem_size) { + return {{0, false, false}, {mem_size}, {1}}; +} + +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_auto_deduced_from_hw_info) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + // Enable auto-deduction of limits + cfg_limit.maxUsageBytes = 0; + + cfg_limit.autoLowerBoundBytes = 100'000; + cfg_limit.autoUpperBoundBytes = 750'000; + cfg_limit.autoPhysMemScaleFactor = 0.5; + + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 500'000); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 500'000); + + cfg_limit.autoPhysMemScaleFactor = 0.75; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 750'000); + + cfg_limit.autoPhysMemScaleFactor = 0.25; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 250'000); + + // Min-capped + cfg_limit.autoPhysMemScaleFactor = 0.05; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 100'000); + + // Max-capped + cfg_limit.autoPhysMemScaleFactor = 0.90; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000); +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_set_explicitly) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + cfg_limit.maxUsageBytes = 1'234'567; + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 1'234'567); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 1'234'567); +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_set_to_unlimited) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + cfg_limit.maxUsageBytes = -1; + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + // Zero implies infinity + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 0); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 0); +} + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp index edb13eea5af..63d8eec6dc3 100644 --- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -40,7 +40,8 @@ struct Fixture { vdstestlib::DirConfig config{getStandardConfig(true)}; TestServiceLayerApp app; ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; - MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()}; + MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), + app.getComponentRegister(), vespalib::HwInfo()}; TestShutdownListener shutdown_listener; ServiceLayerErrorListener error_listener{component, merge_throttler}; diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index dcce3079c68..0d877d33277 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -46,6 +46,38 @@ merge_throttling_policy.min_window_size int default=16 merge_throttling_policy.max_window_size int default=128 merge_throttling_policy.window_size_increment double default=2.0 +## If positive, nodes enforce a soft limit on the estimated amount of memory that +## can be used by merges touching a particular content node. If a merge arrives +## to the node that would violate the soft limit, it will be bounced with BUSY. +## Note that this also counts merges where the node is part of the source-only set, +## since these use memory when/if data is read from the local node. +## +## Semantics: +## > 0 explicit limit in bytes +## == 0 limit automatically deduced by content node +## < 0 unlimited (legacy behavior) +merge_throttling_memory_limit.max_usage_bytes long default=-1 + +## If merge_throttling_memory_limit.max_usage_bytes == 0, this factor is used +## as a multiplier to automatically deduce a memory limit for merges on the +## content node. Note that the result of this multiplication is capped at both +## ends by the auto_(lower|upper)_bound_bytes config values. +## +## Default: 3% of physical memory +merge_throttling_memory_limit.auto_phys_mem_scale_factor double default=0.03 + +## The absolute minimum memory limit that can be set when automatically +## deducing the limit from physical memory on the node. +## +## Default: 128MiB +merge_throttling_memory_limit.auto_lower_bound_bytes long default=134217728 + +## The absolute maximum memory limit that can be set when automatically +## deducing the limit from physical memory on the node. +## +## Default: 2GiB +merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648 + ## If the persistence provider indicates that it has exhausted one or more ## of its internal resources during a mutating operation, new merges will ## be bounced for this duration. Not allowing further merges helps take diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index 7ce034abfee..68f305fe94e 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -9,6 +9,7 @@ #include <vespa/storageframework/generic/clock/clock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/stllike/hash_set.h> #include <array> #include <vespa/log/bufferedlogger.h> @@ -121,7 +122,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) } const lib::ClusterState& clusterState(_bucketSpace->getClusterState()); - std::vector<std::unique_ptr<BucketCopy> > newCopies; + std::vector<std::unique_ptr<BucketCopy>> newCopies; std::vector<MergeMetaData> nodes; for (uint16_t node : getNodes()) { @@ -139,6 +140,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _mnodes.emplace_back(node._nodeIndex, node._sourceOnly); } + const auto estimated_memory_footprint = estimate_merge_memory_footprint_upper_bound(nodes); + if (_mnodes.size() > 1) { auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes, _manager->operation_context().generate_unique_timestamp(), @@ -153,6 +156,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) } else { msg->set_use_unordered_forwarding(true); } + msg->set_estimated_memory_footprint(estimated_memory_footprint); LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); @@ -367,6 +371,40 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const return true; } +uint32_t MergeOperation::estimate_merge_memory_footprint_upper_bound(const std::vector<MergeMetaData>& nodes) const noexcept { + vespalib::hash_set<uint32_t> seen_checksums; + uint32_t worst_case_footprint_across_nodes = 0; + uint32_t largest_single_doc_contribution = 0; + for (const auto& node : nodes) { + if (!seen_checksums.contains(node.checksum())) { + seen_checksums.insert(node.checksum()); + const uint32_t replica_size = node._copy->getUsedFileSize(); + // We don't know the overlap of document sets across replicas, so we have to assume the + // worst and treat the replicas as entirely disjoint. In this case, the _sum_ of all disjoint + // replica group footprints gives us the upper bound. + // Note: saturate-on-overflow check requires all types to be _unsigned_ to work. + if (worst_case_footprint_across_nodes + replica_size >= worst_case_footprint_across_nodes) { + worst_case_footprint_across_nodes += replica_size; + } else { + worst_case_footprint_across_nodes = UINT32_MAX; + } + // Special case for not bounding single massive doc replica to that of the max + // configured bucket size. + if (node._copy->getDocumentCount() == 1) { + largest_single_doc_contribution = std::max(replica_size, largest_single_doc_contribution); + } + } + } + // We know that simply adding up replica sizes is likely to massively over-count in the common + // case (due to the intersection set between replicas rarely being empty), so we cap it by the + // max expected merge chunk size (which is expected to be configured equal to the split limit). + // _Except_ if we have single-doc replicas, as these are known not to overlap, and we know that + // the worst case must be the max of the chunk size and the biggest single doc size. + const uint32_t expected_max_merge_chunk_size = _manager->operation_context().distributor_config().getSplitSize(); + return std::max(std::min(worst_case_footprint_across_nodes, expected_max_merge_chunk_size), + largest_single_doc_contribution); +} + MergeBucketMetricSet* MergeOperation::get_merge_metrics() { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index ff21e3d1594..8f54aea33be 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -63,8 +63,9 @@ private: void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState, DistributorStripeMessageSender& sender); - bool is_global_bucket_merge() const noexcept; - bool all_involved_nodes_support_unordered_merge_chaining() const noexcept; + [[nodiscard]] bool is_global_bucket_merge() const noexcept; + [[nodiscard]] bool all_involved_nodes_support_unordered_merge_chaining() const noexcept; + [[nodiscard]] uint32_t estimate_merge_memory_footprint_upper_bound(const std::vector<MergeMetaData>& nodes) const noexcept; MergeBucketMetricSet* get_merge_metrics(); }; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 4cc2a7a89ab..82bd5ff7ace 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -29,7 +29,7 @@ namespace { struct NodeComparator { bool operator()(const api::MergeBucketCommand::Node& a, - const api::MergeBucketCommand::Node& b) const + const api::MergeBucketCommand::Node& b) const noexcept { return a.index < b.index; } @@ -41,6 +41,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState() : _cmd(), _cmdString(), _clusterStateVersion(0), + _estimated_memory_usage(0), _inCycle(false), _executingLocally(false), _unwinding(false), @@ -52,6 +53,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage:: : _cmd(cmd), _cmdString(cmd->toString()), _clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()), + _estimated_memory_usage(static_cast<const api::MergeBucketCommand&>(*cmd).estimated_memory_footprint()), _inCycle(false), _executingLocally(executing), _unwinding(false), @@ -65,6 +67,9 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this), queueSize("queuesize", {}, "Length of merge queue", this), active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this), + estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the " + "memory usage of the merges currently in the active window", this), + merge_memory_limit("merge_memory_limit", {}, "The active soft limit for memory used by merge operations on this node", this), bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) @@ -180,9 +185,11 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept MergeThrottler::MergeThrottler( const StorServerConfig& bootstrap_config, - StorageComponentRegister& compReg) + StorageComponentRegister& comp_reg, + const vespalib::HwInfo& hw_info) : StorageLink("Merge Throttler"), framework::HtmlStatusReporter("merges", "Merge Throttler"), + _hw_info(hw_info), _merges(), _queue(), _maxQueueSize(1024), @@ -191,11 +198,13 @@ MergeThrottler::MergeThrottler( _messageLock(), _stateLock(), _metrics(std::make_unique<Metrics>()), - _component(compReg, "mergethrottler"), + _component(comp_reg, "mergethrottler"), _thread(), _rendezvous(RendezvousState::NONE), _throttle_until_time(), _backpressure_duration(std::chrono::seconds(30)), + _active_merge_memory_used_bytes(0), + _max_merge_memory_usage_bytes(0), // 0 ==> unlimited _use_dynamic_throttling(false), _disable_queue_limits_for_chained_merges(false), _closing(false) @@ -244,6 +253,14 @@ MergeThrottler::on_configure(const StorServerConfig& new_config) _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; + if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) { + _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes); + } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) { + _max_merge_memory_usage_bytes = deduced_memory_limit(new_config); + } else { + _max_merge_memory_usage_bytes = 0; // Implies unlimited + } + _metrics->merge_memory_limit.set(static_cast<int64_t>(_max_merge_memory_usage_bytes)); } MergeThrottler::~MergeThrottler() @@ -373,16 +390,19 @@ MergeThrottler::forwardCommandToNode( fwdMerge->setPriority(mergeCmd.getPriority()); fwdMerge->setTimeout(mergeCmd.getTimeout()); fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding()); + fwdMerge->set_estimated_memory_footprint(mergeCmd.estimated_memory_footprint()); msgGuard.sendUp(fwdMerge); } void MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter) { - LOG(debug, "Removed merge for %s from internal state", - mergeIter->first.toString().c_str()); + LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str()); + assert(_active_merge_memory_used_bytes >= mergeIter->second._estimated_memory_usage); + _active_merge_memory_used_bytes -= mergeIter->second._estimated_memory_usage; _merges.erase(mergeIter); update_active_merge_window_size_metric(); + update_active_merge_memory_usage_metric(); } api::StorageMessage::SP @@ -714,6 +734,21 @@ bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketComma return !_use_dynamic_throttling; } +bool MergeThrottler::accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept { + // Soft-limit on expected memory usage, but always let at least one merge into the active window. + if ((_max_merge_memory_usage_bytes > 0) && !_merges.empty()) { + size_t future_usage = _active_merge_memory_used_bytes + cmd.estimated_memory_footprint(); + if (future_usage > _max_merge_memory_usage_bytes) { + LOG(spam, "Adding merge with memory footprint %u would exceed node soft limit of %zu. " + "Current memory usage is %zu, future usage would have been %zu", + cmd.estimated_memory_footprint(), _max_merge_memory_usage_bytes, + _active_merge_memory_used_bytes, future_usage); + return false; + } + } + return true; +} + bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept { // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. // Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low @@ -761,7 +796,10 @@ MergeThrottler::handleMessageDown( if (isMergeAlreadyKnown(msg)) { processCycledMergeCommand(msg, msgGuard); - } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) { + } else if (accepting_merge_is_within_memory_limits(mergeCmd) + && (canProcessNewMerge() + || allow_merge_despite_full_window(mergeCmd))) + { processNewMergeCommand(msg, msgGuard); } else if (may_allow_into_queue(mergeCmd)) { enqueue_merge_for_later_processing(msg, msgGuard); @@ -864,9 +902,10 @@ MergeThrottler::processNewMergeCommand( assert(_merges.find(mergeCmd.getBucket()) == _merges.end()); auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first; update_active_merge_window_size_metric(); + _active_merge_memory_used_bytes += mergeCmd.estimated_memory_footprint(); + update_active_merge_memory_usage_metric(); - LOG(debug, "Added merge %s to internal state", - mergeCmd.toString().c_str()); + LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); DummyMbusRequest dummyMsg; _throttlePolicy->processMessage(dummyMsg); @@ -889,7 +928,7 @@ MergeThrottler::processNewMergeCommand( } else { if (!nodeSeq.isLastNode()) { // When we're not the last node and haven't seen the merge before, - // we cannot possible execute the merge yet. Forward to next. + // we cannot possibly execute the merge yet. Forward to next. uint16_t nextNodeInChain = nodeSeq.getNextNodeInChain(); LOG(debug, "Forwarding merge %s to storage node %u", mergeCmd.toString().c_str(), nextNodeInChain); @@ -1291,17 +1330,52 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } void -MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept { +MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept { std::lock_guard lock(_stateLock); _disable_queue_limits_for_chained_merges = disable_limits; } void +MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept { + std::lock_guard lock(_stateLock); + _max_merge_memory_usage_bytes = max_memory_bytes; +} + +uint32_t +MergeThrottler::max_merge_memory_usage_bytes_locking() const noexcept { + std::lock_guard lock(_stateLock); + return _max_merge_memory_usage_bytes; +} + +void +MergeThrottler::set_hw_info_locking(const vespalib::HwInfo& hw_info) { + std::lock_guard lock(_stateLock); + _hw_info = hw_info; +} + +size_t +MergeThrottler::deduced_memory_limit(const StorServerConfig& cfg) const noexcept { + const auto min_limit = static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoLowerBoundBytes, 1L)); + const auto max_limit = std::max(static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoUpperBoundBytes, 1L)), min_limit); + const auto mem_scale_factor = std::max(cfg.mergeThrottlingMemoryLimit.autoPhysMemScaleFactor, 0.0); + + const auto node_mem = static_cast<double>(_hw_info.memory().sizeBytes()); + const auto scaled_mem = static_cast<size_t>(node_mem * mem_scale_factor); + + return std::min(std::max(scaled_mem, min_limit), max_limit); +} + +void MergeThrottler::update_active_merge_window_size_metric() noexcept { _metrics->active_window_size.set(static_cast<int64_t>(_merges.size())); } void +MergeThrottler::update_active_merge_memory_usage_metric() noexcept { + _metrics->estimated_merge_memory_usage.set(static_cast<int64_t>(_active_merge_memory_used_bytes)); +} + +void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 5362c2f6df8..a5559c159bf 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -1,26 +1,24 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::MergeThrottler - * @ingroup storageserver - * - * @brief Throttler and forwarder of merge commands + * Throttler and forwarder of merge commands */ #pragma once -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/common/message_guard.h> -#include <vespa/storage/common/storagelink.h> -#include <vespa/storage/common/storagecomponent.h> -#include <vespa/storageframework/generic/status/htmlstatusreporter.h> -#include <vespa/storageframework/generic/thread/runnable.h> -#include <vespa/storageapi/message/bucket.h> +#include <vespa/config/helper/ifetchercallback.h> #include <vespa/document/bucket/bucket.h> +#include <vespa/metrics/countmetric.h> #include <vespa/metrics/metricset.h> +#include <vespa/metrics/metrictimer.h> #include <vespa/metrics/summetric.h> -#include <vespa/metrics/countmetric.h> #include <vespa/metrics/valuemetric.h> -#include <vespa/metrics/metrictimer.h> -#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/storage/common/message_guard.h> +#include <vespa/storage/common/storagecomponent.h> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/vespalib/util/hw_info.h> #include <chrono> @@ -71,6 +69,8 @@ public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongValueMetric queueSize; metrics::LongValueMetric active_window_size; + metrics::LongValueMetric estimated_merge_memory_usage; + metrics::LongValueMetric merge_memory_limit; metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -113,6 +113,7 @@ private: api::StorageMessage::SP _cmd; std::string _cmdString; // For being able to print message even when we don't own it uint64_t _clusterStateVersion; + uint32_t _estimated_memory_usage; bool _inCycle; bool _executingLocally; bool _unwinding; @@ -154,9 +155,7 @@ private: // Use a set rather than a priority_queue, since we want to be // able to iterate over the collection during status rendering - using MergePriorityQueue = std::set< - StablePriorityOrderingWrapper<api::StorageMessage::SP> - >; + using MergePriorityQueue = std::set<StablePriorityOrderingWrapper<api::StorageMessage::SP>>; enum class RendezvousState { NONE, @@ -165,32 +164,37 @@ private: RELEASED }; - ActiveMergeMap _merges; - MergePriorityQueue _queue; - size_t _maxQueueSize; - std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy; - uint64_t _queueSequence; // TODO: move into a stable priority queue class - mutable std::mutex _messageLock; - std::condition_variable _messageCond; - mutable std::mutex _stateLock; + vespalib::HwInfo _hw_info; + ActiveMergeMap _merges; + MergePriorityQueue _queue; + size_t _maxQueueSize; + std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy; + uint64_t _queueSequence; // TODO: move into a stable priority queue class + mutable std::mutex _messageLock; + std::condition_variable _messageCond; + mutable std::mutex _stateLock; // Messages pending to be processed by the worker thread - std::vector<api::StorageMessage::SP> _messagesDown; - std::vector<api::StorageMessage::SP> _messagesUp; - std::unique_ptr<Metrics> _metrics; - StorageComponent _component; - std::unique_ptr<framework::Thread> _thread; - RendezvousState _rendezvous; + std::vector<api::StorageMessage::SP> _messagesDown; + std::vector<api::StorageMessage::SP> _messagesUp; + std::unique_ptr<Metrics> _metrics; + StorageComponent _component; + std::unique_ptr<framework::Thread> _thread; + RendezvousState _rendezvous; mutable std::chrono::steady_clock::time_point _throttle_until_time; - std::chrono::steady_clock::duration _backpressure_duration; - bool _use_dynamic_throttling; - bool _disable_queue_limits_for_chained_merges; - bool _closing; + std::chrono::steady_clock::duration _backpressure_duration; + size_t _active_merge_memory_used_bytes; + size_t _max_merge_memory_usage_bytes; + bool _use_dynamic_throttling; + bool _disable_queue_limits_for_chained_merges; + bool _closing; public: /** * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&); + MergeThrottler(const StorServerConfig& bootstrap_config, + StorageComponentRegister& comp_reg, + const vespalib::HwInfo& hw_info); ~MergeThrottler() override; /** Implements document::Runnable::run */ @@ -223,7 +227,10 @@ public: // For unit testing only const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } - void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; + void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept; + void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept; + [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept; + void set_hw_info_locking(const vespalib::HwInfo& hw_info); // For unit testing only std::mutex& getStateLock() { return _stateLock; } @@ -363,6 +370,7 @@ private: [[nodiscard]] bool backpressure_mode_active_no_lock() const; void backpressure_bounce_all_queued_merges(MessageGuard& guard); [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept; + [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept; [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept; void sendReply(const api::MergeBucketCommand& cmd, @@ -404,7 +412,10 @@ private: void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion); void markActiveMergesAsAborted(uint32_t minimumStateVersion); + [[nodiscard]] size_t deduced_memory_limit(const StorServerConfig& cfg) const noexcept; + void update_active_merge_window_size_metric() noexcept; + void update_active_merge_memory_usage_metric() noexcept; // const function, but metrics are mutable void updateOperationMetrics( diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 0cce2c27e95..7da75225b6c 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -30,8 +30,9 @@ ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(Ser ServiceLayerNode::ServiceLayerBootstrapConfigs& ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default; -ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, +ServiceLayerNode::ServiceLayerNode(const config::ConfigUri& configUri, ServiceLayerNodeContext& context, + const vespalib::HwInfo& hw_info, ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, @@ -41,6 +42,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, _context(context), _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), + _hw_info(hw_info), _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)), _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)), _filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)), @@ -172,7 +174,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config()); _bouncer = bouncer.get(); builder.add(std::move(bouncer)); - auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg); + auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg, _hw_info); _merge_throttler = merge_throttler_up.get(); builder.add(std::move(merge_throttler_up)); auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index ae39bb0805e..bea09a1c9ce 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -12,6 +12,7 @@ #include <vespa/storage/common/visitorfactory.h> #include <vespa/storage/visiting/config-stor-visitor.h> #include <vespa/storage/visiting/visitormessagesessionfactory.h> +#include <vespa/vespalib/util/hw_info.h> namespace storage { @@ -39,6 +40,7 @@ private: ServiceLayerNodeContext& _context; spi::PersistenceProvider& _persistenceProvider; VisitorFactory::Map _externalVisitors; + vespalib::HwInfo _hw_info; std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config; std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config; std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config; @@ -66,8 +68,9 @@ public: ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept; }; - ServiceLayerNode(const config::ConfigUri & configUri, + ServiceLayerNode(const config::ConfigUri& configUri, ServiceLayerNodeContext& context, + const vespalib::HwInfo& hw_info, ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 850b5db5c98..a32fbc3e4de 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -33,12 +33,13 @@ message MergeNode { } message MergeBucketRequest { - Bucket bucket = 1; - uint32 cluster_state_version = 2; - uint64 max_timestamp = 3; - repeated MergeNode nodes = 4; - repeated uint32 node_chain = 5; - bool unordered_forwarding = 6; + Bucket bucket = 1; + uint32 cluster_state_version = 2; + uint64 max_timestamp = 3; + repeated MergeNode nodes = 4; + repeated uint32 node_chain = 5; + bool unordered_forwarding = 6; + uint32 estimated_memory_footprint = 7; } message MergeBucketResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index af62ec2b418..efbe8c9b42d 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -801,6 +801,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); req.set_unordered_forwarding(msg.use_unordered_forwarding()); + req.set_estimated_memory_footprint(msg.estimated_memory_footprint()); for (uint16_t chain_node : msg.getChain()) { req.add_node_chain(chain_node); } @@ -823,6 +824,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& } cmd->setChain(std::move(chain)); cmd->set_use_unordered_forwarding(req.unordered_forwarding()); + cmd->set_estimated_memory_footprint(req.estimated_memory_footprint()); return cmd; }); } diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp index 49295f54891..499d2f4abe2 100644 --- a/storage/src/vespa/storageapi/message/bucket.cpp +++ b/storage/src/vespa/storageapi/message/bucket.cpp @@ -107,6 +107,7 @@ MergeBucketCommand::MergeBucketCommand( _nodes(nodes), _maxTimestamp(maxTimestamp), _clusterStateVersion(clusterStateVersion), + _estimated_memory_footprint(0), _chain(chain), _use_unordered_forwarding(false) {} @@ -132,6 +133,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in if (_use_unordered_forwarding) { out << " (unordered forwarding)"; } + if (_estimated_memory_footprint > 0) { + out << ", estimated memory footprint: " << _estimated_memory_footprint << " bytes"; + } out << ", reasons to start: " << _reason; out << ")"; if (verbose) { diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h index d1fa00619ae..4aa2ff8b0c1 100644 --- a/storage/src/vespa/storageapi/message/bucket.h +++ b/storage/src/vespa/storageapi/message/bucket.h @@ -118,6 +118,7 @@ private: std::vector<Node> _nodes; Timestamp _maxTimestamp; uint32_t _clusterStateVersion; + uint32_t _estimated_memory_footprint; std::vector<uint16_t> _chain; bool _use_unordered_forwarding; @@ -140,6 +141,12 @@ public: } [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; } [[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); } + void set_estimated_memory_footprint(uint32_t footprint_bytes) noexcept { + _estimated_memory_footprint = footprint_bytes; + } + [[nodiscard]] uint32_t estimated_memory_footprint() const noexcept { + return _estimated_memory_footprint; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket) }; diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp index 8940c2a320e..245afb1c774 100644 --- a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp +++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp @@ -7,7 +7,7 @@ namespace storage { // DummyServiceLayerProcess implementation DummyServiceLayerProcess::DummyServiceLayerProcess(const config::ConfigUri & configUri) - : ServiceLayerProcess(configUri) + : ServiceLayerProcess(configUri, vespalib::HwInfo()) { } diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp index bb284bfc108..ebf320352eb 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp @@ -31,7 +31,7 @@ bucket_db_options_from_config(const config::ConfigUri& config_uri) { } -ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri) +ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri, const vespalib::HwInfo& hw_info) : Process(configUri), _externalVisitors(), _persistence_cfg_handle(), @@ -39,6 +39,7 @@ ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri) _filestor_cfg_handle(), _node(), _storage_chain_builder(), + _hw_info(hw_info), _context(std::make_unique<framework::defaultimplementation::RealClock>(), bucket_db_options_from_config(configUri)) { @@ -106,7 +107,8 @@ ServiceLayerProcess::createNode() sbc.visitor_cfg = _visitor_cfg_handle->getConfig(); sbc.filestor_cfg = _filestor_cfg_handle->getConfig(); - _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors); + _node = std::make_unique<ServiceLayerNode>(_configUri, _context, _hw_info, std::move(sbc), + *this, getProvider(), _externalVisitors); if (_storage_chain_builder) { _node->set_storage_chain_builder(std::move(_storage_chain_builder)); } diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h index dcc56f373c4..add5a38ca9d 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -7,6 +7,7 @@ #include <vespa/storage/common/visitorfactory.h> #include <vespa/storage/storageserver/servicelayernodecontext.h> #include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/vespalib/util/hw_info.h> namespace config { class ConfigUri; } @@ -29,14 +30,15 @@ private: std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle; std::unique_ptr<config::ConfigHandle<StorFilestorConfig>> _filestor_cfg_handle; - std::unique_ptr<ServiceLayerNode> _node; + std::unique_ptr<ServiceLayerNode> _node; std::unique_ptr<IStorageChainBuilder> _storage_chain_builder; protected: + vespalib::HwInfo _hw_info; ServiceLayerNodeContext _context; public: - explicit ServiceLayerProcess(const config::ConfigUri & configUri); + ServiceLayerProcess(const config::ConfigUri & configUri, const vespalib::HwInfo& hw_info); ~ServiceLayerProcess() override; void shutdown() override; |