diff options
Diffstat (limited to 'storage')
9 files changed, 264 insertions, 66 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index addc80e4150..698d8dee573 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -79,7 +79,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> { _protocol(_docMan.getTypeRepoSP()) { } - ~StorageProtocolTest(); + ~StorageProtocolTest() override; void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) { reply.setBucketInfo(_dummy_bucket_info); @@ -456,18 +456,12 @@ TEST_P(StorageProtocolTest, delete_bucket) { TEST_P(StorageProtocolTest, merge_bucket) { using Node = api::MergeBucketCommand::Node; - std::vector<Node> nodes; - nodes.push_back(Node(4, false)); - nodes.push_back(Node(13, true)); - nodes.push_back(Node(26, true)); - - std::vector<uint16_t> chain; - // Not a valid chain wrt. the nodes, but just want to have unique values - chain.push_back(7); - chain.push_back(14); + std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}}; + std::vector<uint16_t> chain = {7, 14}; // Not a valid chain wrt. the nodes, but just want to have unique values auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); cmd->set_use_unordered_forwarding(true); + cmd->set_estimated_memory_footprint(123'456'789); auto cmd2 = copyCommand(cmd); EXPECT_EQ(_bucket, cmd2->getBucket()); EXPECT_EQ(nodes, cmd2->getNodes()); @@ -475,6 +469,7 @@ TEST_P(StorageProtocolTest, merge_bucket) { EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); EXPECT_EQ(chain, cmd2->getChain()); EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); + EXPECT_EQ(cmd2->estimated_memory_footprint(), 123'456'789); auto reply = std::make_shared<MergeBucketReply>(*cmd); auto reply2 = copyReply(reply); @@ -485,6 +480,17 @@ TEST_P(StorageProtocolTest, merge_bucket) { EXPECT_EQ(chain, reply2->getChain()); } +TEST_P(StorageProtocolTest, merge_bucket_estimated_memory_footprint_is_zero_by_default) { + using Node = api::MergeBucketCommand::Node; + std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}}; + std::vector<uint16_t> chain = {7, 14}; + + auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain); + cmd->set_use_unordered_forwarding(true); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->estimated_memory_footprint(), 0); +} + TEST_P(StorageProtocolTest, split_bucket) { auto cmd = std::make_shared<SplitBucketCommand>(_bucket); EXPECT_EQ(0u, cmd->getMinSplitBits()); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 7a7f2551c2d..48e0ab186f2 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -1,17 +1,18 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> -#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storage/persistence/messages.h> +#include <vespa/storage/storageserver/mergethrottler.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/state.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/exceptions.h> +#include <vespa/vespalib/util/size_literals.h> #include <unordered_set> #include <memory> #include <iterator> @@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig; vespalib::string _storage("storage"); struct MergeBuilder { - document::BucketId _bucket; - api::Timestamp _maxTimestamp; - std::vector<uint16_t> _nodes; - std::vector<uint16_t> _chain; + document::BucketId _bucket; + api::Timestamp _maxTimestamp; + std::vector<uint16_t> _nodes; + std::vector<uint16_t> _chain; std::unordered_set<uint16_t> _source_only; - uint64_t _clusterStateVersion; + uint64_t _clusterStateVersion; + uint32_t _memory_usage; + bool _unordered; - MergeBuilder(const document::BucketId& bucket) + explicit MergeBuilder(const document::BucketId& bucket) : _bucket(bucket), _maxTimestamp(1234), _chain(), - _clusterStateVersion(1) + _clusterStateVersion(1), + _memory_usage(0), + _unordered(false) { nodes(0, 1, 2); } @@ -100,6 +105,14 @@ struct MergeBuilder { _source_only.insert(node); return *this; } + MergeBuilder& memory_usage(uint32_t usage_bytes) { + _memory_usage = usage_bytes; + return *this; + } + MergeBuilder& unordered(bool is_unordered) { + _unordered = is_unordered; + return *this; + } api::MergeBucketCommand::SP create() const { std::vector<api::MergeBucketCommand::Node> n; @@ -112,6 +125,8 @@ struct MergeBuilder { makeDocumentBucket(_bucket), n, _maxTimestamp, _clusterStateVersion, _chain); cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0])); + cmd->set_estimated_memory_footprint(_memory_usage); + cmd->set_use_unordered_forwarding(_unordered); return cmd; } }; @@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test { std::vector<DummyStorageLink*> _bottomLinks; MergeThrottlerTest(); - ~MergeThrottlerTest(); + ~MergeThrottlerTest() override; void SetUp() override; void TearDown() override; api::MergeBucketCommand::SP sendMerge(const MergeBuilder&); - void sendAndExpectReply( + void send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode); + std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg); + void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void fill_up_throttler_active_window_and_queue(uint16_t node_idx); void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) { @@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp() std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister()); + auto* throttler = new MergeThrottler(*config, server->getComponentRegister()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); - DummyStorageLink* bottom = new DummyStorageLink; + auto* bottom = new DummyStorageLink; throttler->push_back(std::unique_ptr<StorageLink>(bottom)); _servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release())); @@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) { cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); const uint16_t distributorIndex = 123; cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded + cmd->set_estimated_memory_footprint(456'789); StorageMessage::SP fwd = cmd; StorageMessage::SP fwdToExec; @@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) { } EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end())); } - // Ensure priority, cluster state version and timeout is correctly forwarded + // Ensure operation properties are forwarded as expected EXPECT_EQ(7, static_cast<int>(fwd->getPriority())); - EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion()); - EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout()); + auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd); + EXPECT_EQ(as_merge.getClusterStateVersion(), 123); + EXPECT_EQ(as_merge.getTimeout(), 54321ms); + EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789); } _topLinks[lastNodeIdx]->sendDown(fwd); @@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) { } void -MergeThrottlerTest::sendAndExpectReply( +MergeThrottlerTest::send_and_expect_reply( const std::shared_ptr<api::StorageMessage>& msg, const api::MessageType& expectedReplyType, api::ReturnCode::Result expectedResultCode) @@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply( ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult()); } +std::shared_ptr<api::StorageMessage> +MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg) +{ + _topLinks[0]->sendDown(msg); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET); +} + TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) { document::BucketId bucket(16, 1234); std::vector<api::GetBucketDiffCommand::Node> nodes; auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), nodes, api::Timestamp(1234)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject std::vector<api::GetBucketDiffCommand::Node> nodes; auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + applyDiffCmd, api::MessageType::APPLYBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); ASSERT_EQ(0, _bottomLinks[0]->getNumCommands()); @@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges) auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>( makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123)); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd, + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + getDiffCmd, api::MessageType::GETBUCKETDIFF_REPLY, api::ReturnCode::ABORTED)); } @@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue()); EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue()); - ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(), + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(bucket).create(), api::MessageType::MERGEBUCKET_REPLY, api::ReturnCode::BUSY)); @@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) { EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult()); } +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) { + ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself + + _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + + ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + + std::shared_ptr<api::StorageMessage> fwd_cmd; + ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered + // merge and can therefore NOT be enqueued; it must be bounced immediately. + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + MergeBuilder(document::BucketId(16, 1)) + .nodes(2, 1, 0).chain(2, 1).unordered(true) + .memory_usage(8_Mi).create(), + MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY)); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged + + // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in. + auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply(); + fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine")); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_reply( + std::shared_ptr<api::StorageReply>(std::move(fwd_reply)), + MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge + + ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + + // New merge is accepted + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1 + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi); +} + +TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) { + _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + + ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + + // Accepting this merge would exceed memory limits. It is sent directly from a distributor and + // can therefore be enqueued. + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create()); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged +} + +TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) { + _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create()); + _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi); +} + +TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { + // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing + // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we + // send for below in the test code. + ASSERT_LT(throttler_max_merges_pending(0), 1000); + + _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi); + // Fill up active window on node 0. Note: these merges do not have any associated memory cost. + fill_throttler_queue_with_n_commands(0, 0); + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); + + _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create()); + waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); +} + +// TODO define and test auto-deduced max memory usage + // TODO test message queue aborting (use rendezvous functionality--make guard) } // namespace storage diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index dcce3079c68..2f34d5372ab 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -46,6 +46,18 @@ merge_throttling_policy.min_window_size int default=16 merge_throttling_policy.max_window_size int default=128 merge_throttling_policy.window_size_increment double default=2.0 +## If positive, nodes enforce a soft limit on the estimated amount of memory that +## can be used by merges touching a particular content node. If a merge arrives +## to the node that would violate the soft limit, it will be bounced with BUSY. +## Note that this also counts merges where the node is part of the source-only set, +## since these use memory when/if data is read from the local node. +## +## Semantics: +## > 0 explicit limit in bytes +## == 0 limit automatically derived by content node +## < 0 unlimited (legacy behavior) +max_merge_memory_usage_bytes long default=-1 + ## If the persistence provider indicates that it has exhausted one or more ## of its internal resources during a mutating operation, new merges will ## be bounced for this duration. Not allowing further merges helps take diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 4cc2a7a89ab..4646604a980 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -29,7 +29,7 @@ namespace { struct NodeComparator { bool operator()(const api::MergeBucketCommand::Node& a, - const api::MergeBucketCommand::Node& b) const + const api::MergeBucketCommand::Node& b) const noexcept { return a.index < b.index; } @@ -41,6 +41,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState() : _cmd(), _cmdString(), _clusterStateVersion(0), + _estimated_memory_usage(0), _inCycle(false), _executingLocally(false), _unwinding(false), @@ -52,6 +53,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage:: : _cmd(cmd), _cmdString(cmd->toString()), _clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()), + _estimated_memory_usage(static_cast<const api::MergeBucketCommand&>(*cmd).estimated_memory_footprint()), _inCycle(false), _executingLocally(executing), _unwinding(false), @@ -65,6 +67,8 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this), queueSize("queuesize", {}, "Length of merge queue", this), active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this), + estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the " + "memory usage of the merges currently in the active window", this), bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) @@ -196,6 +200,8 @@ MergeThrottler::MergeThrottler( _rendezvous(RendezvousState::NONE), _throttle_until_time(), _backpressure_duration(std::chrono::seconds(30)), + _active_merge_memory_used_bytes(0), + _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited _use_dynamic_throttling(false), _disable_queue_limits_for_chained_merges(false), _closing(false) @@ -244,6 +250,12 @@ MergeThrottler::on_configure(const StorServerConfig& new_config) _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; + if (new_config.maxMergeMemoryUsageBytes > 0) { + _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes); + } else { + _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits + } + } MergeThrottler::~MergeThrottler() @@ -373,16 +385,19 @@ MergeThrottler::forwardCommandToNode( fwdMerge->setPriority(mergeCmd.getPriority()); fwdMerge->setTimeout(mergeCmd.getTimeout()); fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding()); + fwdMerge->set_estimated_memory_footprint(mergeCmd.estimated_memory_footprint()); msgGuard.sendUp(fwdMerge); } void MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter) { - LOG(debug, "Removed merge for %s from internal state", - mergeIter->first.toString().c_str()); + LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str()); + assert(_active_merge_memory_used_bytes >= mergeIter->second._estimated_memory_usage); + _active_merge_memory_used_bytes -= mergeIter->second._estimated_memory_usage; _merges.erase(mergeIter); update_active_merge_window_size_metric(); + update_active_merge_memory_usage_metric(); } api::StorageMessage::SP @@ -714,6 +729,21 @@ bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketComma return !_use_dynamic_throttling; } +bool MergeThrottler::accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept { + // Soft-limit on expected memory usage, but always let at least one merge into the active window. + if ((_max_merge_memory_usage_bytes > 0) && !_merges.empty()) { + size_t future_usage = _active_merge_memory_used_bytes + cmd.estimated_memory_footprint(); + if (future_usage > _max_merge_memory_usage_bytes) { + LOG(spam, "Adding merge with memory footprint %u would exceed node soft limit of %zu. " + "Current memory usage is %zu, future usage would have been %zu", + cmd.estimated_memory_footprint(), _max_merge_memory_usage_bytes, + _active_merge_memory_used_bytes, future_usage); + return false; + } + } + return true; +} + bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept { // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. // Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low @@ -761,7 +791,10 @@ MergeThrottler::handleMessageDown( if (isMergeAlreadyKnown(msg)) { processCycledMergeCommand(msg, msgGuard); - } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) { + } else if (accepting_merge_is_within_memory_limits(mergeCmd) + && (canProcessNewMerge() + || allow_merge_despite_full_window(mergeCmd))) + { processNewMergeCommand(msg, msgGuard); } else if (may_allow_into_queue(mergeCmd)) { enqueue_merge_for_later_processing(msg, msgGuard); @@ -864,9 +897,10 @@ MergeThrottler::processNewMergeCommand( assert(_merges.find(mergeCmd.getBucket()) == _merges.end()); auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first; update_active_merge_window_size_metric(); + _active_merge_memory_used_bytes += mergeCmd.estimated_memory_footprint(); + update_active_merge_memory_usage_metric(); - LOG(debug, "Added merge %s to internal state", - mergeCmd.toString().c_str()); + LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str()); DummyMbusRequest dummyMsg; _throttlePolicy->processMessage(dummyMsg); @@ -889,7 +923,7 @@ MergeThrottler::processNewMergeCommand( } else { if (!nodeSeq.isLastNode()) { // When we're not the last node and haven't seen the merge before, - // we cannot possible execute the merge yet. Forward to next. + // we cannot possibly execute the merge yet. Forward to next. uint16_t nextNodeInChain = nodeSeq.getNextNodeInChain(); LOG(debug, "Forwarding merge %s to storage node %u", mergeCmd.toString().c_str(), nextNodeInChain); @@ -1297,11 +1331,22 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) } void +MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept { + std::lock_guard lock(_stateLock); + _max_merge_memory_usage_bytes = max_memory_bytes; +} + +void MergeThrottler::update_active_merge_window_size_metric() noexcept { _metrics->active_window_size.set(static_cast<int64_t>(_merges.size())); } void +MergeThrottler::update_active_merge_memory_usage_metric() noexcept { + _metrics->estimated_merge_memory_usage.set(static_cast<int64_t>(_active_merge_memory_used_bytes)); +} + +void MergeThrottler::print(std::ostream& out, bool /*verbose*/, const std::string& /*indent*/) const { diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 5362c2f6df8..8f6c4d62f68 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -71,6 +71,7 @@ public: metrics::DoubleAverageMetric averageQueueWaitingTime; metrics::LongValueMetric queueSize; metrics::LongValueMetric active_window_size; + metrics::LongValueMetric estimated_merge_memory_usage; metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -113,6 +114,7 @@ private: api::StorageMessage::SP _cmd; std::string _cmdString; // For being able to print message even when we don't own it uint64_t _clusterStateVersion; + uint32_t _estimated_memory_usage; bool _inCycle; bool _executingLocally; bool _unwinding; @@ -154,9 +156,7 @@ private: // Use a set rather than a priority_queue, since we want to be // able to iterate over the collection during status rendering - using MergePriorityQueue = std::set< - StablePriorityOrderingWrapper<api::StorageMessage::SP> - >; + using MergePriorityQueue = std::set<StablePriorityOrderingWrapper<api::StorageMessage::SP>>; enum class RendezvousState { NONE, @@ -165,26 +165,28 @@ private: RELEASED }; - ActiveMergeMap _merges; - MergePriorityQueue _queue; - size_t _maxQueueSize; - std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy; - uint64_t _queueSequence; // TODO: move into a stable priority queue class - mutable std::mutex _messageLock; - std::condition_variable _messageCond; - mutable std::mutex _stateLock; + ActiveMergeMap _merges; + MergePriorityQueue _queue; + size_t _maxQueueSize; + std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy; + uint64_t _queueSequence; // TODO: move into a stable priority queue class + mutable std::mutex _messageLock; + std::condition_variable _messageCond; + mutable std::mutex _stateLock; // Messages pending to be processed by the worker thread - std::vector<api::StorageMessage::SP> _messagesDown; - std::vector<api::StorageMessage::SP> _messagesUp; - std::unique_ptr<Metrics> _metrics; - StorageComponent _component; - std::unique_ptr<framework::Thread> _thread; - RendezvousState _rendezvous; + std::vector<api::StorageMessage::SP> _messagesDown; + std::vector<api::StorageMessage::SP> _messagesUp; + std::unique_ptr<Metrics> _metrics; + StorageComponent _component; + std::unique_ptr<framework::Thread> _thread; + RendezvousState _rendezvous; mutable std::chrono::steady_clock::time_point _throttle_until_time; - std::chrono::steady_clock::duration _backpressure_duration; - bool _use_dynamic_throttling; - bool _disable_queue_limits_for_chained_merges; - bool _closing; + std::chrono::steady_clock::duration _backpressure_duration; + size_t _active_merge_memory_used_bytes; + size_t _max_merge_memory_usage_bytes; + bool _use_dynamic_throttling; + bool _disable_queue_limits_for_chained_merges; + bool _closing; public: /** * windowSizeIncrement used for allowing unit tests to start out with more @@ -224,6 +226,7 @@ public: const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; + void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept; // For unit testing only std::mutex& getStateLock() { return _stateLock; } @@ -363,6 +366,7 @@ private: [[nodiscard]] bool backpressure_mode_active_no_lock() const; void backpressure_bounce_all_queued_merges(MessageGuard& guard); [[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept; + [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept; [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept; void sendReply(const api::MergeBucketCommand& cmd, @@ -405,6 +409,7 @@ private: void markActiveMergesAsAborted(uint32_t minimumStateVersion); void update_active_merge_window_size_metric() noexcept; + void update_active_merge_memory_usage_metric() noexcept; // const function, but metrics are mutable void updateOperationMetrics( diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 850b5db5c98..a32fbc3e4de 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -33,12 +33,13 @@ message MergeNode { } message MergeBucketRequest { - Bucket bucket = 1; - uint32 cluster_state_version = 2; - uint64 max_timestamp = 3; - repeated MergeNode nodes = 4; - repeated uint32 node_chain = 5; - bool unordered_forwarding = 6; + Bucket bucket = 1; + uint32 cluster_state_version = 2; + uint64 max_timestamp = 3; + repeated MergeNode nodes = 4; + repeated uint32 node_chain = 5; + bool unordered_forwarding = 6; + uint32 estimated_memory_footprint = 7; } message MergeBucketResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index af62ec2b418..efbe8c9b42d 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -801,6 +801,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); req.set_unordered_forwarding(msg.use_unordered_forwarding()); + req.set_estimated_memory_footprint(msg.estimated_memory_footprint()); for (uint16_t chain_node : msg.getChain()) { req.add_node_chain(chain_node); } @@ -823,6 +824,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& } cmd->setChain(std::move(chain)); cmd->set_use_unordered_forwarding(req.unordered_forwarding()); + cmd->set_estimated_memory_footprint(req.estimated_memory_footprint()); return cmd; }); } diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp index 49295f54891..499d2f4abe2 100644 --- a/storage/src/vespa/storageapi/message/bucket.cpp +++ b/storage/src/vespa/storageapi/message/bucket.cpp @@ -107,6 +107,7 @@ MergeBucketCommand::MergeBucketCommand( _nodes(nodes), _maxTimestamp(maxTimestamp), _clusterStateVersion(clusterStateVersion), + _estimated_memory_footprint(0), _chain(chain), _use_unordered_forwarding(false) {} @@ -132,6 +133,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in if (_use_unordered_forwarding) { out << " (unordered forwarding)"; } + if (_estimated_memory_footprint > 0) { + out << ", estimated memory footprint: " << _estimated_memory_footprint << " bytes"; + } out << ", reasons to start: " << _reason; out << ")"; if (verbose) { diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h index d1fa00619ae..4aa2ff8b0c1 100644 --- a/storage/src/vespa/storageapi/message/bucket.h +++ b/storage/src/vespa/storageapi/message/bucket.h @@ -118,6 +118,7 @@ private: std::vector<Node> _nodes; Timestamp _maxTimestamp; uint32_t _clusterStateVersion; + uint32_t _estimated_memory_footprint; std::vector<uint16_t> _chain; bool _use_unordered_forwarding; @@ -140,6 +141,12 @@ public: } [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; } [[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); } + void set_estimated_memory_footprint(uint32_t footprint_bytes) noexcept { + _estimated_memory_footprint = footprint_bytes; + } + [[nodiscard]] uint32_t estimated_memory_footprint() const noexcept { + return _estimated_memory_footprint; + } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket) }; |