aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-30 14:20:20 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-01 15:36:56 +0000
commit6567a48a55f05f2f20c0cc7ae63dc454319c87e1 (patch)
tree81f292dd2c7f3fe9f6d5604bf5de7cca1184f057 /storage/src
parentb04b6184f8d8fe0b9d569caf6ed46c69312c1821 (diff)
Add content node soft limit on max memory used by merges
If configured, the active merge window is limited so that the sum of estimated memory usage for its merges does not go beyond the configured soft memory limit. The window can always fit a minimum of 1 merge regardless of its size to ensure progress in the cluster (thus this is a soft limit, not a hard limit).
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp26
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp160
-rw-r--r--storage/src/vespa/storage/config/stor-server.def12
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp59
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h47
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto13
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp2
-rw-r--r--storage/src/vespa/storageapi/message/bucket.cpp4
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h7
9 files changed, 264 insertions, 66 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index addc80e4150..698d8dee573 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -79,7 +79,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> {
_protocol(_docMan.getTypeRepoSP())
{
}
- ~StorageProtocolTest();
+ ~StorageProtocolTest() override;
void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) {
reply.setBucketInfo(_dummy_bucket_info);
@@ -456,18 +456,12 @@ TEST_P(StorageProtocolTest, delete_bucket) {
TEST_P(StorageProtocolTest, merge_bucket) {
using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.push_back(Node(4, false));
- nodes.push_back(Node(13, true));
- nodes.push_back(Node(26, true));
-
- std::vector<uint16_t> chain;
- // Not a valid chain wrt. the nodes, but just want to have unique values
- chain.push_back(7);
- chain.push_back(14);
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14}; // Not a valid chain wrt. the nodes, but just want to have unique values
auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
cmd->set_use_unordered_forwarding(true);
+ cmd->set_estimated_memory_footprint(123'456'789);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
EXPECT_EQ(nodes, cmd2->getNodes());
@@ -475,6 +469,7 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 123'456'789);
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
@@ -485,6 +480,17 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(chain, reply2->getChain());
}
+TEST_P(StorageProtocolTest, merge_bucket_estimated_memory_footprint_is_zero_by_default) {
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14};
+
+ auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ cmd->set_use_unordered_forwarding(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 0);
+}
+
TEST_P(StorageProtocolTest, split_bucket) {
auto cmd = std::make_shared<SplitBucketCommand>(_bucket);
EXPECT_EQ(0u, cmd->getMinSplitBits());
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7a7f2551c2d..48e0ab186f2 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,17 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig;
vespalib::string _storage("storage");
struct MergeBuilder {
- document::BucketId _bucket;
- api::Timestamp _maxTimestamp;
- std::vector<uint16_t> _nodes;
- std::vector<uint16_t> _chain;
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
std::unordered_set<uint16_t> _source_only;
- uint64_t _clusterStateVersion;
+ uint64_t _clusterStateVersion;
+ uint32_t _memory_usage;
+ bool _unordered;
- MergeBuilder(const document::BucketId& bucket)
+ explicit MergeBuilder(const document::BucketId& bucket)
: _bucket(bucket),
_maxTimestamp(1234),
_chain(),
- _clusterStateVersion(1)
+ _clusterStateVersion(1),
+ _memory_usage(0),
+ _unordered(false)
{
nodes(0, 1, 2);
}
@@ -100,6 +105,14 @@ struct MergeBuilder {
_source_only.insert(node);
return *this;
}
+ MergeBuilder& memory_usage(uint32_t usage_bytes) {
+ _memory_usage = usage_bytes;
+ return *this;
+ }
+ MergeBuilder& unordered(bool is_unordered) {
+ _unordered = is_unordered;
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
@@ -112,6 +125,8 @@ struct MergeBuilder {
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->set_estimated_memory_footprint(_memory_usage);
+ cmd->set_use_unordered_forwarding(_unordered);
return cmd;
}
};
@@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test {
std::vector<DummyStorageLink*> _bottomLinks;
MergeThrottlerTest();
- ~MergeThrottlerTest();
+ ~MergeThrottlerTest() override;
void SetUp() override;
void TearDown() override;
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
- void sendAndExpectReply(
+ void send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+ std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
+
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
@@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
- DummyStorageLink* bottom = new DummyStorageLink;
+ auto* bottom = new DummyStorageLink;
throttler->push_back(std::unique_ptr<StorageLink>(bottom));
_servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
@@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) {
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+ cmd->set_estimated_memory_footprint(456'789);
StorageMessage::SP fwd = cmd;
StorageMessage::SP fwdToExec;
@@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end()));
}
- // Ensure priority, cluster state version and timeout is correctly forwarded
+ // Ensure operation properties are forwarded as expected
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
- EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd);
+ EXPECT_EQ(as_merge.getClusterStateVersion(), 123);
+ EXPECT_EQ(as_merge.getTimeout(), 54321ms);
+ EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789);
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
}
void
-MergeThrottlerTest::sendAndExpectReply(
+MergeThrottlerTest::send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode)
@@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply(
ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult());
}
+std::shared_ptr<api::StorageMessage>
+MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+}
+
TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges)
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
}
@@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat
EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(),
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY));
@@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
+ ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ std::shared_ptr<api::StorageMessage> fwd_cmd;
+ ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
+ // merge and can therefore NOT be enqueued; it must be bounced immediately.
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(document::BucketId(16, 1))
+ .nodes(2, 1, 0).chain(2, 1).unordered(true)
+ .memory_usage(8_Mi).create(),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+
+ // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+ fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine"));
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ // New merge is accepted
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+}
+
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent directly from a distributor and
+ // can therefore be enqueued.
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+}
+
+TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+}
+
+TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
+ // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing
+ // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we
+ // send for below in the test code.
+ ASSERT_LT(throttler_max_merges_pending(0), 1000);
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ // Fill up active window on node 0. Note: these merges do not have any associated memory cost.
+ fill_throttler_queue_with_n_commands(0, 0);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+// TODO define and test auto-deduced max memory usage
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index dcce3079c68..2f34d5372ab 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -46,6 +46,18 @@ merge_throttling_policy.min_window_size int default=16
merge_throttling_policy.max_window_size int default=128
merge_throttling_policy.window_size_increment double default=2.0
+## If positive, nodes enforce a soft limit on the estimated amount of memory that
+## can be used by merges touching a particular content node. If a merge arrives
+## to the node that would violate the soft limit, it will be bounced with BUSY.
+## Note that this also counts merges where the node is part of the source-only set,
+## since these use memory when/if data is read from the local node.
+##
+## Semantics:
+## > 0 explicit limit in bytes
+## == 0 limit automatically derived by content node
+## < 0 unlimited (legacy behavior)
+max_merge_memory_usage_bytes long default=-1
+
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
## be bounced for this duration. Not allowing further merges helps take
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 4cc2a7a89ab..4646604a980 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -29,7 +29,7 @@ namespace {
struct NodeComparator {
bool operator()(const api::MergeBucketCommand::Node& a,
- const api::MergeBucketCommand::Node& b) const
+ const api::MergeBucketCommand::Node& b) const noexcept
{
return a.index < b.index;
}
@@ -41,6 +41,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState()
: _cmd(),
_cmdString(),
_clusterStateVersion(0),
+ _estimated_memory_usage(0),
_inCycle(false),
_executingLocally(false),
_unwinding(false),
@@ -52,6 +53,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage::
: _cmd(cmd),
_cmdString(cmd->toString()),
_clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()),
+ _estimated_memory_usage(static_cast<const api::MergeBucketCommand&>(*cmd).estimated_memory_footprint()),
_inCycle(false),
_executingLocally(executing),
_unwinding(false),
@@ -65,6 +67,8 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this),
queueSize("queuesize", {}, "Length of merge queue", this),
active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
+ estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the "
+ "memory usage of the merges currently in the active window", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -196,6 +200,8 @@ MergeThrottler::MergeThrottler(
_rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _active_merge_memory_used_bytes(0),
+ _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited
_use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
@@ -244,6 +250,12 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
_disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
+ if (new_config.maxMergeMemoryUsageBytes > 0) {
+ _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes);
+ } else {
+ _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits
+ }
+
}
MergeThrottler::~MergeThrottler()
@@ -373,16 +385,19 @@ MergeThrottler::forwardCommandToNode(
fwdMerge->setPriority(mergeCmd.getPriority());
fwdMerge->setTimeout(mergeCmd.getTimeout());
fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding());
+ fwdMerge->set_estimated_memory_footprint(mergeCmd.estimated_memory_footprint());
msgGuard.sendUp(fwdMerge);
}
void
MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter)
{
- LOG(debug, "Removed merge for %s from internal state",
- mergeIter->first.toString().c_str());
+ LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str());
+ assert(_active_merge_memory_used_bytes >= mergeIter->second._estimated_memory_usage);
+ _active_merge_memory_used_bytes -= mergeIter->second._estimated_memory_usage;
_merges.erase(mergeIter);
update_active_merge_window_size_metric();
+ update_active_merge_memory_usage_metric();
}
api::StorageMessage::SP
@@ -714,6 +729,21 @@ bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketComma
return !_use_dynamic_throttling;
}
+bool MergeThrottler::accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept {
+ // Soft-limit on expected memory usage, but always let at least one merge into the active window.
+ if ((_max_merge_memory_usage_bytes > 0) && !_merges.empty()) {
+ size_t future_usage = _active_merge_memory_used_bytes + cmd.estimated_memory_footprint();
+ if (future_usage > _max_merge_memory_usage_bytes) {
+ LOG(spam, "Adding merge with memory footprint %u would exceed node soft limit of %zu. "
+ "Current memory usage is %zu, future usage would have been %zu",
+ cmd.estimated_memory_footprint(), _max_merge_memory_usage_bytes,
+ _active_merge_memory_used_bytes, future_usage);
+ return false;
+ }
+ }
+ return true;
+}
+
bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low
@@ -761,7 +791,10 @@ MergeThrottler::handleMessageDown(
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
- } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) {
+ } else if (accepting_merge_is_within_memory_limits(mergeCmd)
+ && (canProcessNewMerge()
+ || allow_merge_despite_full_window(mergeCmd)))
+ {
processNewMergeCommand(msg, msgGuard);
} else if (may_allow_into_queue(mergeCmd)) {
enqueue_merge_for_later_processing(msg, msgGuard);
@@ -864,9 +897,10 @@ MergeThrottler::processNewMergeCommand(
assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
update_active_merge_window_size_metric();
+ _active_merge_memory_used_bytes += mergeCmd.estimated_memory_footprint();
+ update_active_merge_memory_usage_metric();
- LOG(debug, "Added merge %s to internal state",
- mergeCmd.toString().c_str());
+ LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str());
DummyMbusRequest dummyMsg;
_throttlePolicy->processMessage(dummyMsg);
@@ -889,7 +923,7 @@ MergeThrottler::processNewMergeCommand(
} else {
if (!nodeSeq.isLastNode()) {
// When we're not the last node and haven't seen the merge before,
- // we cannot possible execute the merge yet. Forward to next.
+ // we cannot possibly execute the merge yet. Forward to next.
uint16_t nextNodeInChain = nodeSeq.getNextNodeInChain();
LOG(debug, "Forwarding merge %s to storage node %u",
mergeCmd.toString().c_str(), nextNodeInChain);
@@ -1297,11 +1331,22 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits)
}
void
+MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept {
+ std::lock_guard lock(_stateLock);
+ _max_merge_memory_usage_bytes = max_memory_bytes;
+}
+
+void
MergeThrottler::update_active_merge_window_size_metric() noexcept {
_metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
}
void
+MergeThrottler::update_active_merge_memory_usage_metric() noexcept {
+ _metrics->estimated_merge_memory_usage.set(static_cast<int64_t>(_active_merge_memory_used_bytes));
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 5362c2f6df8..8f6c4d62f68 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -71,6 +71,7 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongValueMetric queueSize;
metrics::LongValueMetric active_window_size;
+ metrics::LongValueMetric estimated_merge_memory_usage;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -113,6 +114,7 @@ private:
api::StorageMessage::SP _cmd;
std::string _cmdString; // For being able to print message even when we don't own it
uint64_t _clusterStateVersion;
+ uint32_t _estimated_memory_usage;
bool _inCycle;
bool _executingLocally;
bool _unwinding;
@@ -154,9 +156,7 @@ private:
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
- using MergePriorityQueue = std::set<
- StablePriorityOrderingWrapper<api::StorageMessage::SP>
- >;
+ using MergePriorityQueue = std::set<StablePriorityOrderingWrapper<api::StorageMessage::SP>>;
enum class RendezvousState {
NONE,
@@ -165,26 +165,28 @@ private:
RELEASED
};
- ActiveMergeMap _merges;
- MergePriorityQueue _queue;
- size_t _maxQueueSize;
- std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
- uint64_t _queueSequence; // TODO: move into a stable priority queue class
- mutable std::mutex _messageLock;
- std::condition_variable _messageCond;
- mutable std::mutex _stateLock;
+ ActiveMergeMap _merges;
+ MergePriorityQueue _queue;
+ size_t _maxQueueSize;
+ std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
+ uint64_t _queueSequence; // TODO: move into a stable priority queue class
+ mutable std::mutex _messageLock;
+ std::condition_variable _messageCond;
+ mutable std::mutex _stateLock;
// Messages pending to be processed by the worker thread
- std::vector<api::StorageMessage::SP> _messagesDown;
- std::vector<api::StorageMessage::SP> _messagesUp;
- std::unique_ptr<Metrics> _metrics;
- StorageComponent _component;
- std::unique_ptr<framework::Thread> _thread;
- RendezvousState _rendezvous;
+ std::vector<api::StorageMessage::SP> _messagesDown;
+ std::vector<api::StorageMessage::SP> _messagesUp;
+ std::unique_ptr<Metrics> _metrics;
+ StorageComponent _component;
+ std::unique_ptr<framework::Thread> _thread;
+ RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
- std::chrono::steady_clock::duration _backpressure_duration;
- bool _use_dynamic_throttling;
- bool _disable_queue_limits_for_chained_merges;
- bool _closing;
+ std::chrono::steady_clock::duration _backpressure_duration;
+ size_t _active_merge_memory_used_bytes;
+ size_t _max_merge_memory_usage_bytes;
+ bool _use_dynamic_throttling;
+ bool _disable_queue_limits_for_chained_merges;
+ bool _closing;
public:
/**
* windowSizeIncrement used for allowing unit tests to start out with more
@@ -224,6 +226,7 @@ public:
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
+ void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept;
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -363,6 +366,7 @@ private:
[[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
@@ -405,6 +409,7 @@ private:
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
void update_active_merge_window_size_metric() noexcept;
+ void update_active_merge_memory_usage_metric() noexcept;
// const function, but metrics are mutable
void updateOperationMetrics(
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 850b5db5c98..a32fbc3e4de 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -33,12 +33,13 @@ message MergeNode {
}
message MergeBucketRequest {
- Bucket bucket = 1;
- uint32 cluster_state_version = 2;
- uint64 max_timestamp = 3;
- repeated MergeNode nodes = 4;
- repeated uint32 node_chain = 5;
- bool unordered_forwarding = 6;
+ Bucket bucket = 1;
+ uint32 cluster_state_version = 2;
+ uint64 max_timestamp = 3;
+ repeated MergeNode nodes = 4;
+ repeated uint32 node_chain = 5;
+ bool unordered_forwarding = 6;
+ uint32 estimated_memory_footprint = 7;
}
message MergeBucketResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index af62ec2b418..efbe8c9b42d 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -801,6 +801,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand&
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
req.set_unordered_forwarding(msg.use_unordered_forwarding());
+ req.set_estimated_memory_footprint(msg.estimated_memory_footprint());
for (uint16_t chain_node : msg.getChain()) {
req.add_node_chain(chain_node);
}
@@ -823,6 +824,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf&
}
cmd->setChain(std::move(chain));
cmd->set_use_unordered_forwarding(req.unordered_forwarding());
+ cmd->set_estimated_memory_footprint(req.estimated_memory_footprint());
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp
index 49295f54891..499d2f4abe2 100644
--- a/storage/src/vespa/storageapi/message/bucket.cpp
+++ b/storage/src/vespa/storageapi/message/bucket.cpp
@@ -107,6 +107,7 @@ MergeBucketCommand::MergeBucketCommand(
_nodes(nodes),
_maxTimestamp(maxTimestamp),
_clusterStateVersion(clusterStateVersion),
+ _estimated_memory_footprint(0),
_chain(chain),
_use_unordered_forwarding(false)
{}
@@ -132,6 +133,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in
if (_use_unordered_forwarding) {
out << " (unordered forwarding)";
}
+ if (_estimated_memory_footprint > 0) {
+ out << ", estimated memory footprint: " << _estimated_memory_footprint << " bytes";
+ }
out << ", reasons to start: " << _reason;
out << ")";
if (verbose) {
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index d1fa00619ae..4aa2ff8b0c1 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -118,6 +118,7 @@ private:
std::vector<Node> _nodes;
Timestamp _maxTimestamp;
uint32_t _clusterStateVersion;
+ uint32_t _estimated_memory_footprint;
std::vector<uint16_t> _chain;
bool _use_unordered_forwarding;
@@ -140,6 +141,12 @@ public:
}
[[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
[[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); }
+ void set_estimated_memory_footprint(uint32_t footprint_bytes) noexcept {
+ _estimated_memory_footprint = footprint_bytes;
+ }
+ [[nodiscard]] uint32_t estimated_memory_footprint() const noexcept {
+ return _estimated_memory_footprint;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};