summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp26
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp160
-rw-r--r--storage/src/vespa/storage/config/stor-server.def12
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp59
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h47
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto13
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp2
-rw-r--r--storage/src/vespa/storageapi/message/bucket.cpp4
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h7
9 files changed, 264 insertions, 66 deletions
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index addc80e4150..698d8dee573 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -79,7 +79,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> {
_protocol(_docMan.getTypeRepoSP())
{
}
- ~StorageProtocolTest();
+ ~StorageProtocolTest() override;
void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) {
reply.setBucketInfo(_dummy_bucket_info);
@@ -456,18 +456,12 @@ TEST_P(StorageProtocolTest, delete_bucket) {
TEST_P(StorageProtocolTest, merge_bucket) {
using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.push_back(Node(4, false));
- nodes.push_back(Node(13, true));
- nodes.push_back(Node(26, true));
-
- std::vector<uint16_t> chain;
- // Not a valid chain wrt. the nodes, but just want to have unique values
- chain.push_back(7);
- chain.push_back(14);
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14}; // Not a valid chain wrt. the nodes, but just want to have unique values
auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
cmd->set_use_unordered_forwarding(true);
+ cmd->set_estimated_memory_footprint(123'456'789);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
EXPECT_EQ(nodes, cmd2->getNodes());
@@ -475,6 +469,7 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 123'456'789);
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
@@ -485,6 +480,17 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(chain, reply2->getChain());
}
+TEST_P(StorageProtocolTest, merge_bucket_estimated_memory_footprint_is_zero_by_default) {
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14};
+
+ auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ cmd->set_use_unordered_forwarding(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 0);
+}
+
TEST_P(StorageProtocolTest, split_bucket) {
auto cmd = std::make_shared<SplitBucketCommand>(_bucket);
EXPECT_EQ(0u, cmd->getMinSplitBits());
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7a7f2551c2d..48e0ab186f2 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,17 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -34,18 +35,22 @@ using StorServerConfig = vespa::config::content::core::StorServerConfig;
vespalib::string _storage("storage");
struct MergeBuilder {
- document::BucketId _bucket;
- api::Timestamp _maxTimestamp;
- std::vector<uint16_t> _nodes;
- std::vector<uint16_t> _chain;
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
std::unordered_set<uint16_t> _source_only;
- uint64_t _clusterStateVersion;
+ uint64_t _clusterStateVersion;
+ uint32_t _memory_usage;
+ bool _unordered;
- MergeBuilder(const document::BucketId& bucket)
+ explicit MergeBuilder(const document::BucketId& bucket)
: _bucket(bucket),
_maxTimestamp(1234),
_chain(),
- _clusterStateVersion(1)
+ _clusterStateVersion(1),
+ _memory_usage(0),
+ _unordered(false)
{
nodes(0, 1, 2);
}
@@ -100,6 +105,14 @@ struct MergeBuilder {
_source_only.insert(node);
return *this;
}
+ MergeBuilder& memory_usage(uint32_t usage_bytes) {
+ _memory_usage = usage_bytes;
+ return *this;
+ }
+ MergeBuilder& unordered(bool is_unordered) {
+ _unordered = is_unordered;
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
@@ -112,6 +125,8 @@ struct MergeBuilder {
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->set_estimated_memory_footprint(_memory_usage);
+ cmd->set_use_unordered_forwarding(_unordered);
return cmd;
}
};
@@ -137,20 +152,21 @@ struct MergeThrottlerTest : Test {
std::vector<DummyStorageLink*> _bottomLinks;
MergeThrottlerTest();
- ~MergeThrottlerTest();
+ ~MergeThrottlerTest() override;
void SetUp() override;
void TearDown() override;
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
- void sendAndExpectReply(
+ void send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+ std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
+
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
@@ -180,10 +196,10 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
- DummyStorageLink* bottom = new DummyStorageLink;
+ auto* bottom = new DummyStorageLink;
throttler->push_back(std::unique_ptr<StorageLink>(bottom));
_servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
@@ -291,6 +307,7 @@ TEST_F(MergeThrottlerTest, chain) {
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+ cmd->set_estimated_memory_footprint(456'789);
StorageMessage::SP fwd = cmd;
StorageMessage::SP fwdToExec;
@@ -322,10 +339,12 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end()));
}
- // Ensure priority, cluster state version and timeout is correctly forwarded
+ // Ensure operation properties are forwarded as expected
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
- EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd);
+ EXPECT_EQ(as_merge.getClusterStateVersion(), 123);
+ EXPECT_EQ(as_merge.getTimeout(), 54321ms);
+ EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789);
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -1350,7 +1369,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
}
void
-MergeThrottlerTest::sendAndExpectReply(
+MergeThrottlerTest::send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode)
@@ -1362,13 +1381,22 @@ MergeThrottlerTest::sendAndExpectReply(
ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult());
}
+std::shared_ptr<api::StorageMessage>
+MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+}
+
TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1379,7 +1407,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1411,7 +1440,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges)
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
}
@@ -1428,7 +1458,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat
EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(),
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY));
@@ -1480,6 +1511,91 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
+ ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ std::shared_ptr<api::StorageMessage> fwd_cmd;
+ ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
+ // merge and can therefore NOT be enqueued; it must be bounced immediately.
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(document::BucketId(16, 1))
+ .nodes(2, 1, 0).chain(2, 1).unordered(true)
+ .memory_usage(8_Mi).create(),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+
+ // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+ fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine"));
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
+
+ ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ // New merge is accepted
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+}
+
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent directly from a distributor and
+ // can therefore be enqueued.
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+}
+
+TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
+ _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+}
+
+TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
+ // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing
+ // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we
+ // send for below in the test code.
+ ASSERT_LT(throttler_max_merges_pending(0), 1000);
+
+ _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ // Fill up active window on node 0. Note: these merges do not have any associated memory cost.
+ fill_throttler_queue_with_n_commands(0, 0);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
+ waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+// TODO define and test auto-deduced max memory usage
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index dcce3079c68..2f34d5372ab 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -46,6 +46,18 @@ merge_throttling_policy.min_window_size int default=16
merge_throttling_policy.max_window_size int default=128
merge_throttling_policy.window_size_increment double default=2.0
+## If positive, nodes enforce a soft limit on the estimated amount of memory that
+## can be used by merges touching a particular content node. If a merge arrives
+## to the node that would violate the soft limit, it will be bounced with BUSY.
+## Note that this also counts merges where the node is part of the source-only set,
+## since these use memory when/if data is read from the local node.
+##
+## Semantics:
+## > 0 explicit limit in bytes
+## == 0 limit automatically derived by content node
+## < 0 unlimited (legacy behavior)
+max_merge_memory_usage_bytes long default=-1
+
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
## be bounced for this duration. Not allowing further merges helps take
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 4cc2a7a89ab..4646604a980 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -29,7 +29,7 @@ namespace {
struct NodeComparator {
bool operator()(const api::MergeBucketCommand::Node& a,
- const api::MergeBucketCommand::Node& b) const
+ const api::MergeBucketCommand::Node& b) const noexcept
{
return a.index < b.index;
}
@@ -41,6 +41,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState()
: _cmd(),
_cmdString(),
_clusterStateVersion(0),
+ _estimated_memory_usage(0),
_inCycle(false),
_executingLocally(false),
_unwinding(false),
@@ -52,6 +53,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage::
: _cmd(cmd),
_cmdString(cmd->toString()),
_clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()),
+ _estimated_memory_usage(static_cast<const api::MergeBucketCommand&>(*cmd).estimated_memory_footprint()),
_inCycle(false),
_executingLocally(executing),
_unwinding(false),
@@ -65,6 +67,8 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this),
queueSize("queuesize", {}, "Length of merge queue", this),
active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
+ estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the "
+ "memory usage of the merges currently in the active window", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -196,6 +200,8 @@ MergeThrottler::MergeThrottler(
_rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _active_merge_memory_used_bytes(0),
+ _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited
_use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
@@ -244,6 +250,12 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
_disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
+ if (new_config.maxMergeMemoryUsageBytes > 0) {
+ _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes);
+ } else {
+ _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits
+ }
+
}
MergeThrottler::~MergeThrottler()
@@ -373,16 +385,19 @@ MergeThrottler::forwardCommandToNode(
fwdMerge->setPriority(mergeCmd.getPriority());
fwdMerge->setTimeout(mergeCmd.getTimeout());
fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding());
+ fwdMerge->set_estimated_memory_footprint(mergeCmd.estimated_memory_footprint());
msgGuard.sendUp(fwdMerge);
}
void
MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter)
{
- LOG(debug, "Removed merge for %s from internal state",
- mergeIter->first.toString().c_str());
+ LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str());
+ assert(_active_merge_memory_used_bytes >= mergeIter->second._estimated_memory_usage);
+ _active_merge_memory_used_bytes -= mergeIter->second._estimated_memory_usage;
_merges.erase(mergeIter);
update_active_merge_window_size_metric();
+ update_active_merge_memory_usage_metric();
}
api::StorageMessage::SP
@@ -714,6 +729,21 @@ bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketComma
return !_use_dynamic_throttling;
}
+bool MergeThrottler::accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept {
+ // Soft-limit on expected memory usage, but always let at least one merge into the active window.
+ if ((_max_merge_memory_usage_bytes > 0) && !_merges.empty()) {
+ size_t future_usage = _active_merge_memory_used_bytes + cmd.estimated_memory_footprint();
+ if (future_usage > _max_merge_memory_usage_bytes) {
+ LOG(spam, "Adding merge with memory footprint %u would exceed node soft limit of %zu. "
+ "Current memory usage is %zu, future usage would have been %zu",
+ cmd.estimated_memory_footprint(), _max_merge_memory_usage_bytes,
+ _active_merge_memory_used_bytes, future_usage);
+ return false;
+ }
+ }
+ return true;
+}
+
bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low
@@ -761,7 +791,10 @@ MergeThrottler::handleMessageDown(
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
- } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) {
+ } else if (accepting_merge_is_within_memory_limits(mergeCmd)
+ && (canProcessNewMerge()
+ || allow_merge_despite_full_window(mergeCmd)))
+ {
processNewMergeCommand(msg, msgGuard);
} else if (may_allow_into_queue(mergeCmd)) {
enqueue_merge_for_later_processing(msg, msgGuard);
@@ -864,9 +897,10 @@ MergeThrottler::processNewMergeCommand(
assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
update_active_merge_window_size_metric();
+ _active_merge_memory_used_bytes += mergeCmd.estimated_memory_footprint();
+ update_active_merge_memory_usage_metric();
- LOG(debug, "Added merge %s to internal state",
- mergeCmd.toString().c_str());
+ LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str());
DummyMbusRequest dummyMsg;
_throttlePolicy->processMessage(dummyMsg);
@@ -889,7 +923,7 @@ MergeThrottler::processNewMergeCommand(
} else {
if (!nodeSeq.isLastNode()) {
// When we're not the last node and haven't seen the merge before,
- // we cannot possible execute the merge yet. Forward to next.
+ // we cannot possibly execute the merge yet. Forward to next.
uint16_t nextNodeInChain = nodeSeq.getNextNodeInChain();
LOG(debug, "Forwarding merge %s to storage node %u",
mergeCmd.toString().c_str(), nextNodeInChain);
@@ -1297,11 +1331,22 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits)
}
void
+MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept {
+ std::lock_guard lock(_stateLock);
+ _max_merge_memory_usage_bytes = max_memory_bytes;
+}
+
+void
MergeThrottler::update_active_merge_window_size_metric() noexcept {
_metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
}
void
+MergeThrottler::update_active_merge_memory_usage_metric() noexcept {
+ _metrics->estimated_merge_memory_usage.set(static_cast<int64_t>(_active_merge_memory_used_bytes));
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 5362c2f6df8..8f6c4d62f68 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -71,6 +71,7 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongValueMetric queueSize;
metrics::LongValueMetric active_window_size;
+ metrics::LongValueMetric estimated_merge_memory_usage;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -113,6 +114,7 @@ private:
api::StorageMessage::SP _cmd;
std::string _cmdString; // For being able to print message even when we don't own it
uint64_t _clusterStateVersion;
+ uint32_t _estimated_memory_usage;
bool _inCycle;
bool _executingLocally;
bool _unwinding;
@@ -154,9 +156,7 @@ private:
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
- using MergePriorityQueue = std::set<
- StablePriorityOrderingWrapper<api::StorageMessage::SP>
- >;
+ using MergePriorityQueue = std::set<StablePriorityOrderingWrapper<api::StorageMessage::SP>>;
enum class RendezvousState {
NONE,
@@ -165,26 +165,28 @@ private:
RELEASED
};
- ActiveMergeMap _merges;
- MergePriorityQueue _queue;
- size_t _maxQueueSize;
- std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
- uint64_t _queueSequence; // TODO: move into a stable priority queue class
- mutable std::mutex _messageLock;
- std::condition_variable _messageCond;
- mutable std::mutex _stateLock;
+ ActiveMergeMap _merges;
+ MergePriorityQueue _queue;
+ size_t _maxQueueSize;
+ std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
+ uint64_t _queueSequence; // TODO: move into a stable priority queue class
+ mutable std::mutex _messageLock;
+ std::condition_variable _messageCond;
+ mutable std::mutex _stateLock;
// Messages pending to be processed by the worker thread
- std::vector<api::StorageMessage::SP> _messagesDown;
- std::vector<api::StorageMessage::SP> _messagesUp;
- std::unique_ptr<Metrics> _metrics;
- StorageComponent _component;
- std::unique_ptr<framework::Thread> _thread;
- RendezvousState _rendezvous;
+ std::vector<api::StorageMessage::SP> _messagesDown;
+ std::vector<api::StorageMessage::SP> _messagesUp;
+ std::unique_ptr<Metrics> _metrics;
+ StorageComponent _component;
+ std::unique_ptr<framework::Thread> _thread;
+ RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
- std::chrono::steady_clock::duration _backpressure_duration;
- bool _use_dynamic_throttling;
- bool _disable_queue_limits_for_chained_merges;
- bool _closing;
+ std::chrono::steady_clock::duration _backpressure_duration;
+ size_t _active_merge_memory_used_bytes;
+ size_t _max_merge_memory_usage_bytes;
+ bool _use_dynamic_throttling;
+ bool _disable_queue_limits_for_chained_merges;
+ bool _closing;
public:
/**
* windowSizeIncrement used for allowing unit tests to start out with more
@@ -224,6 +226,7 @@ public:
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
+ void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept;
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -363,6 +366,7 @@ private:
[[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
@@ -405,6 +409,7 @@ private:
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
void update_active_merge_window_size_metric() noexcept;
+ void update_active_merge_memory_usage_metric() noexcept;
// const function, but metrics are mutable
void updateOperationMetrics(
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 850b5db5c98..a32fbc3e4de 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -33,12 +33,13 @@ message MergeNode {
}
message MergeBucketRequest {
- Bucket bucket = 1;
- uint32 cluster_state_version = 2;
- uint64 max_timestamp = 3;
- repeated MergeNode nodes = 4;
- repeated uint32 node_chain = 5;
- bool unordered_forwarding = 6;
+ Bucket bucket = 1;
+ uint32 cluster_state_version = 2;
+ uint64 max_timestamp = 3;
+ repeated MergeNode nodes = 4;
+ repeated uint32 node_chain = 5;
+ bool unordered_forwarding = 6;
+ uint32 estimated_memory_footprint = 7;
}
message MergeBucketResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index af62ec2b418..efbe8c9b42d 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -801,6 +801,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand&
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
req.set_unordered_forwarding(msg.use_unordered_forwarding());
+ req.set_estimated_memory_footprint(msg.estimated_memory_footprint());
for (uint16_t chain_node : msg.getChain()) {
req.add_node_chain(chain_node);
}
@@ -823,6 +824,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf&
}
cmd->setChain(std::move(chain));
cmd->set_use_unordered_forwarding(req.unordered_forwarding());
+ cmd->set_estimated_memory_footprint(req.estimated_memory_footprint());
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp
index 49295f54891..499d2f4abe2 100644
--- a/storage/src/vespa/storageapi/message/bucket.cpp
+++ b/storage/src/vespa/storageapi/message/bucket.cpp
@@ -107,6 +107,7 @@ MergeBucketCommand::MergeBucketCommand(
_nodes(nodes),
_maxTimestamp(maxTimestamp),
_clusterStateVersion(clusterStateVersion),
+ _estimated_memory_footprint(0),
_chain(chain),
_use_unordered_forwarding(false)
{}
@@ -132,6 +133,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in
if (_use_unordered_forwarding) {
out << " (unordered forwarding)";
}
+ if (_estimated_memory_footprint > 0) {
+ out << ", estimated memory footprint: " << _estimated_memory_footprint << " bytes";
+ }
out << ", reasons to start: " << _reason;
out << ")";
if (verbose) {
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index d1fa00619ae..4aa2ff8b0c1 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -118,6 +118,7 @@ private:
std::vector<Node> _nodes;
Timestamp _maxTimestamp;
uint32_t _clusterStateVersion;
+ uint32_t _estimated_memory_footprint;
std::vector<uint16_t> _chain;
bool _use_unordered_forwarding;
@@ -140,6 +141,12 @@ public:
}
[[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
[[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); }
+ void set_estimated_memory_footprint(uint32_t footprint_bytes) noexcept {
+ _estimated_memory_footprint = footprint_bytes;
+ }
+ [[nodiscard]] uint32_t estimated_memory_footprint() const noexcept {
+ return _estimated_memory_footprint;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};