summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-11-02 15:33:22 +0100
committerGitHub <noreply@github.com>2023-11-02 15:33:22 +0100
commit756ac9de004347d2d5b8893b43c37eb6ac930560 (patch)
treef0ce3b5db120b40bfa264593627b010fe7a309cc
parent875422da13b5236fe368709d66f4696f06fca3f2 (diff)
parentf2983f6f300630f6aec7538d75eed6356fe12da6 (diff)
Merge pull request #29194 from vespa-engine/vekterli/merge-memory-usage-soft-limiting
Add configurable soft limiting of memory used by merge operations on a content node
-rw-r--r--searchcore/src/apps/proton/proton.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp2
-rw-r--r--storage/src/tests/distributor/mergeoperationtest.cpp71
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp26
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp247
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp3
-rw-r--r--storage/src/vespa/storage/config/stor-server.def32
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h5
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp94
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h85
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h5
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto13
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp2
-rw-r--r--storage/src/vespa/storageapi/message/bucket.cpp4
-rw-r--r--storage/src/vespa/storageapi/message/bucket.h7
-rw-r--r--storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp2
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp6
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h6
20 files changed, 553 insertions, 115 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 129091606b3..4c20c40b406 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -109,7 +109,8 @@ class ProtonServiceLayerProcess : public storage::ServiceLayerProcess {
public:
ProtonServiceLayerProcess(const config::ConfigUri & configUri,
proton::Proton & proton, FNET_Transport& transport,
- const vespalib::string& file_distributor_connection_spec);
+ const vespalib::string& file_distributor_connection_spec,
+ const vespalib::HwInfo& hw_info);
~ProtonServiceLayerProcess() override { shutdown(); }
void shutdown() override;
@@ -130,8 +131,9 @@ public:
ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri,
proton::Proton & proton, FNET_Transport& transport,
- const vespalib::string& file_distributor_connection_spec)
- : ServiceLayerProcess(configUri),
+ const vespalib::string& file_distributor_connection_spec,
+ const vespalib::HwInfo& hw_info)
+ : ServiceLayerProcess(configUri, hw_info),
_proton(proton),
_transport(transport),
_file_distributor_connection_spec(file_distributor_connection_spec),
@@ -259,18 +261,18 @@ App::startAndRun(FNET_Transport & transport, int argc, char **argv) {
proton.init(configSnapshot);
}
vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec;
- configSnapshot.reset();
std::unique_ptr<ProtonServiceLayerProcess> spiProton;
if ( ! params.serviceidentity.empty()) {
spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport,
- file_distributor_connection_spec);
+ file_distributor_connection_spec, configSnapshot->getHwInfo());
spiProton->setupConfig(subscribeTimeout);
spiProton->createNode();
EV_STARTED("servicelayer");
} else {
proton.getMetricManager().init(identityUri);
}
+ configSnapshot.reset();
EV_STARTED("proton");
while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
std::this_thread::sleep_for(1000ms);
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 9a0a2968c69..808747034ac 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -243,7 +243,7 @@ public:
MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri& configUri,
PersistenceProvider& provider,
std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
- : ServiceLayerProcess(configUri),
+ : ServiceLayerProcess(configUri, vespalib::HwInfo()),
_provider(provider)
{
if (chain_builder) {
diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp
index 0773958e535..6ed05e14519 100644
--- a/storage/src/tests/distributor/mergeoperationtest.cpp
+++ b/storage/src/tests/distributor/mergeoperationtest.cpp
@@ -45,6 +45,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
MergeBucketMetricSet& get_merge_metrics();
+ [[nodiscard]] uint32_t merge_footprint(const std::string& db_state);
};
std::shared_ptr<MergeOperation>
@@ -86,7 +87,7 @@ MergeOperationTest::assert_simple_merge_bucket_command()
{
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0",
_sender.getLastCommand(true));
}
@@ -295,7 +296,7 @@ TEST_F(MergeOperationTest, do_not_remove_copies_with_pending_messages) {
std::string merge("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
- "reasons to start: ) => 0");
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
@@ -356,8 +357,8 @@ TEST_F(MergeOperationTest, allow_deleting_active_source_only_replica) {
std::string merge(
"MergeBucketCommand(BucketId(0x4000000000000001), to time "
- "10000000, cluster state version: 0, nodes: [0, 2, 1 "
- "(source only)], chain: [], reasons to start: ) => 0");
+ "10000000, cluster state version: 0, nodes: [0, 2, 1 (source only)], chain: [], "
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 0");
ASSERT_EQ(merge, _sender.getLastCommand(true));
sendReply(op);
@@ -580,14 +581,14 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [2, 1, 3], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
// All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
setup_simple_merge_op({1, 2});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
"cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
- "reasons to start: ) => 2",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 2",
_sender.getLastCommand(true));
_sender.clear();
@@ -600,7 +601,7 @@ TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all
setup_simple_merge_op({2, 1});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
"cluster state version: 0, nodes: [2, 1], chain: [], "
- "reasons to start: ) => 1",
+ "estimated memory footprint: 2 bytes, reasons to start: ) => 1",
_sender.getLastCommand(true));
}
@@ -644,4 +645,60 @@ TEST_F(MergeOperationTest, no_delete_bucket_ops_sent_if_node_subset_cancelled) {
EXPECT_FALSE(op->ok());
}
+uint32_t MergeOperationTest::merge_footprint(const std::string& db_state) {
+ getClock().setAbsoluteTimeInSeconds(10);
+ addNodesToBucketDB(document::BucketId(16, 1), db_state);
+ enable_cluster_state("distributor:1 storage:3");
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
+ op.setIdealStateManager(&getIdealStateManager());
+
+ _sender.clear();
+ op.start(_sender);
+ assert(!_sender.commands().empty());
+ auto cmd_as_merge = std::dynamic_pointer_cast<api::MergeBucketCommand>(_sender.commands()[0]);
+ assert(cmd_as_merge);
+ return cmd_as_merge->estimated_memory_footprint();
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_computed_from_replica_state) {
+ // Reminder of syntax: "index=checksum/doc count/doc size"
+ // {0,2} in sync, {1} out of sync; footprint is sum across "sync-ness groups"
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=10/100/3000"), 10'000);
+ EXPECT_EQ(merge_footprint("0=10/100/7000,1=20/200/3000,2=10/100/7000"), 10'000);
+ // All replicas mutually out of sync
+ EXPECT_EQ(merge_footprint("0=10/100/3000,1=20/200/7000,2=30/100/5000"), 15'000);
+ // One replica empty
+ EXPECT_EQ(merge_footprint("0=20/200/4000,1=20/200/4000,2=1/0/0"), 4'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_is_bounded_by_max_expected_merge_chunk_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000); // proxy for max merge chunk size
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/9999"), 19'999);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10000"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/100/10001"), 20'000);
+ EXPECT_EQ(merge_footprint("0=10/100/6000,1=20/200/7000,2=30/100/20000"), 20'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_with_single_doc_replica_can_be_greater_than_max_expected_bucket_size) {
+ auto cfg = make_config();
+ cfg->setSplitSize(20'000);
+ configure_stripe(cfg);
+
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/200/5000,2=30/1/50000"), 50'000);
+ EXPECT_EQ(merge_footprint("0=10/100/5000,1=20/1/60000,2=30/1/50000"), 60'000);
+}
+
+TEST_F(MergeOperationTest, memory_footprint_estimation_saturates_instead_of_overflowing_u32_limits) {
+ auto cfg = make_config();
+ cfg->setSplitSize(1'234'567);
+ configure_stripe(cfg);
+ // Here we massively _undercount_ our estimate, but this is a wildly unrealistic replica state
+ // just for testing correctness of arithmetic ops.
+ // UINT32_MAX/3 * 3 + 1 will overflow to 0 if unchecked. Must be saturated instead.
+ EXPECT_EQ(merge_footprint("0=10/10/1431655765,1=20/10/1431655765,2=30/10/1431655766"), 1'234'567);
+}
+
} // storage::distributor
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index addc80e4150..698d8dee573 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -79,7 +79,7 @@ struct StorageProtocolTest : TestWithParam<vespalib::Version> {
_protocol(_docMan.getTypeRepoSP())
{
}
- ~StorageProtocolTest();
+ ~StorageProtocolTest() override;
void set_dummy_bucket_info_reply_fields(BucketInfoReply& reply) {
reply.setBucketInfo(_dummy_bucket_info);
@@ -456,18 +456,12 @@ TEST_P(StorageProtocolTest, delete_bucket) {
TEST_P(StorageProtocolTest, merge_bucket) {
using Node = api::MergeBucketCommand::Node;
- std::vector<Node> nodes;
- nodes.push_back(Node(4, false));
- nodes.push_back(Node(13, true));
- nodes.push_back(Node(26, true));
-
- std::vector<uint16_t> chain;
- // Not a valid chain wrt. the nodes, but just want to have unique values
- chain.push_back(7);
- chain.push_back(14);
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14}; // Not a valid chain wrt. the nodes, but just want to have unique values
auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
cmd->set_use_unordered_forwarding(true);
+ cmd->set_estimated_memory_footprint(123'456'789);
auto cmd2 = copyCommand(cmd);
EXPECT_EQ(_bucket, cmd2->getBucket());
EXPECT_EQ(nodes, cmd2->getNodes());
@@ -475,6 +469,7 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion());
EXPECT_EQ(chain, cmd2->getChain());
EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding());
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 123'456'789);
auto reply = std::make_shared<MergeBucketReply>(*cmd);
auto reply2 = copyReply(reply);
@@ -485,6 +480,17 @@ TEST_P(StorageProtocolTest, merge_bucket) {
EXPECT_EQ(chain, reply2->getChain());
}
+TEST_P(StorageProtocolTest, merge_bucket_estimated_memory_footprint_is_zero_by_default) {
+ using Node = api::MergeBucketCommand::Node;
+ std::vector<Node> nodes = {{4, false}, {13, true}, {26, true}};
+ std::vector<uint16_t> chain = {7, 14};
+
+ auto cmd = std::make_shared<MergeBucketCommand>(_bucket, nodes, Timestamp(1234), 567, chain);
+ cmd->set_use_unordered_forwarding(true);
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->estimated_memory_footprint(), 0);
+}
+
TEST_P(StorageProtocolTest, split_bucket) {
auto cmd = std::make_shared<SplitBucketCommand>(_bucket);
EXPECT_EQ(0u, cmd->getMinSplitBits());
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 7a7f2551c2d..6f80ffe0727 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1,17 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
-#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/mergethrottler.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/state.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <unordered_set>
#include <memory>
#include <iterator>
@@ -30,22 +31,33 @@ namespace storage {
namespace {
using StorServerConfig = vespa::config::content::core::StorServerConfig;
+using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder;
vespalib::string _storage("storage");
+std::unique_ptr<StorServerConfig> default_server_config() {
+ vdstestlib::DirConfig dir_config(getStandardConfig(true));
+ auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
+ return config_from<StorServerConfig>(cfg_uri);
+}
+
struct MergeBuilder {
- document::BucketId _bucket;
- api::Timestamp _maxTimestamp;
- std::vector<uint16_t> _nodes;
- std::vector<uint16_t> _chain;
+ document::BucketId _bucket;
+ api::Timestamp _maxTimestamp;
+ std::vector<uint16_t> _nodes;
+ std::vector<uint16_t> _chain;
std::unordered_set<uint16_t> _source_only;
- uint64_t _clusterStateVersion;
+ uint64_t _clusterStateVersion;
+ uint32_t _memory_usage;
+ bool _unordered;
- MergeBuilder(const document::BucketId& bucket)
+ explicit MergeBuilder(const document::BucketId& bucket)
: _bucket(bucket),
_maxTimestamp(1234),
_chain(),
- _clusterStateVersion(1)
+ _clusterStateVersion(1),
+ _memory_usage(0),
+ _unordered(false)
{
nodes(0, 1, 2);
}
@@ -100,6 +112,14 @@ struct MergeBuilder {
_source_only.insert(node);
return *this;
}
+ MergeBuilder& memory_usage(uint32_t usage_bytes) {
+ _memory_usage = usage_bytes;
+ return *this;
+ }
+ MergeBuilder& unordered(bool is_unordered) {
+ _unordered = is_unordered;
+ return *this;
+ }
api::MergeBucketCommand::SP create() const {
std::vector<api::MergeBucketCommand::Node> n;
@@ -112,6 +132,8 @@ struct MergeBuilder {
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->set_estimated_memory_footprint(_memory_usage);
+ cmd->set_use_unordered_forwarding(_unordered);
return cmd;
}
};
@@ -137,20 +159,26 @@ struct MergeThrottlerTest : Test {
std::vector<DummyStorageLink*> _bottomLinks;
MergeThrottlerTest();
- ~MergeThrottlerTest();
+ ~MergeThrottlerTest() override;
void SetUp() override;
void TearDown() override;
+ MergeThrottler& throttler(size_t idx) noexcept {
+ assert(idx < _throttlers.size());
+ return *_throttlers[idx];
+ }
+
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
- void sendAndExpectReply(
+ void send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode);
+ std::shared_ptr<api::StorageMessage> send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg);
+
void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count);
- void fill_up_throttler_active_window_and_queue(uint16_t node_idx);
void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false);
std::shared_ptr<api::MergeBucketCommand> peek_throttler_queue_top(size_t throttler_idx) {
@@ -170,20 +198,17 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- vdstestlib::DirConfig dir_config(getStandardConfig(true));
- auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
- auto config = config_from<StorServerConfig>(cfg_uri);
-
+ auto config = default_server_config();
for (int i = 0; i < _storageNodeCount; ++i) {
auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1"));
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
- DummyStorageLink* bottom = new DummyStorageLink;
+ auto* bottom = new DummyStorageLink;
throttler->push_back(std::unique_ptr<StorageLink>(bottom));
_servers.push_back(std::shared_ptr<TestServiceLayerApp>(server.release()));
@@ -291,6 +316,7 @@ TEST_F(MergeThrottlerTest, chain) {
cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
+ cmd->set_estimated_memory_footprint(456'789);
StorageMessage::SP fwd = cmd;
StorageMessage::SP fwdToExec;
@@ -322,10 +348,12 @@ TEST_F(MergeThrottlerTest, chain) {
}
EXPECT_TRUE(checkChain(fwd, chain.begin(), chain.end()));
}
- // Ensure priority, cluster state version and timeout is correctly forwarded
+ // Ensure operation properties are forwarded as expected
EXPECT_EQ(7, static_cast<int>(fwd->getPriority()));
- EXPECT_EQ(123, dynamic_cast<const MergeBucketCommand&>(*fwd).getClusterStateVersion());
- EXPECT_EQ(54321ms, dynamic_cast<const StorageCommand&>(*fwd).getTimeout());
+ auto& as_merge = dynamic_cast<const MergeBucketCommand&>(*fwd);
+ EXPECT_EQ(as_merge.getClusterStateVersion(), 123);
+ EXPECT_EQ(as_merge.getTimeout(), 54321ms);
+ EXPECT_EQ(as_merge.estimated_memory_footprint(), 456'789);
}
_topLinks[lastNodeIdx]->sendDown(fwd);
@@ -1209,7 +1237,7 @@ void
MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
- _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
+ _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits);
size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1350,7 +1378,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) {
}
void
-MergeThrottlerTest::sendAndExpectReply(
+MergeThrottlerTest::send_and_expect_reply(
const std::shared_ptr<api::StorageMessage>& msg,
const api::MessageType& expectedReplyType,
api::ReturnCode::Result expectedResultCode)
@@ -1362,13 +1390,22 @@ MergeThrottlerTest::sendAndExpectReply(
ASSERT_EQ(expectedResultCode, storageReply.getResult().getResult());
}
+std::shared_ptr<api::StorageMessage>
+MergeThrottlerTest::send_and_expect_forwarding(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ _topLinks[0]->sendDown(msg);
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
+ return _topLinks[0]->getAndRemoveMessage(MessageType::MERGEBUCKET);
+}
+
TEST_F(MergeThrottlerTest, get_bucket_diff_command_not_in_active_set_is_rejected) {
document::BucketId bucket(16, 1234);
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), nodes, api::Timestamp(1234));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1379,7 +1416,8 @@ TEST_F(MergeThrottlerTest, apply_bucket_diff_command_not_in_active_set_is_reject
std::vector<api::GetBucketDiffCommand::Node> nodes;
auto applyDiffCmd = std::make_shared<api::ApplyBucketDiffCommand>(makeDocumentBucket(bucket), nodes);
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(applyDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ applyDiffCmd,
api::MessageType::APPLYBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
ASSERT_EQ(0, _bottomLinks[0]->getNumCommands());
@@ -1411,7 +1449,8 @@ TEST_F(MergeThrottlerTest, new_cluster_state_aborts_all_outdated_active_merges)
auto getDiffCmd = std::make_shared<api::GetBucketDiffCommand>(
makeDocumentBucket(bucket), std::vector<api::GetBucketDiffCommand::Node>(), api::Timestamp(123));
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(getDiffCmd,
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ getDiffCmd,
api::MessageType::GETBUCKETDIFF_REPLY,
api::ReturnCode::ABORTED));
}
@@ -1428,7 +1467,8 @@ TEST_F(MergeThrottlerTest, backpressure_busy_bounces_merges_for_configured_durat
EXPECT_EQ(0, _throttlers[0]->getMetrics().bounced_due_to_back_pressure.getValue());
EXPECT_EQ(uint64_t(0), _throttlers[0]->getMetrics().local.failures.busy.getValue());
- ASSERT_NO_FATAL_FAILURE(sendAndExpectReply(MergeBuilder(bucket).create(),
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(bucket).create(),
api::MessageType::MERGEBUCKET_REPLY,
api::ReturnCode::BUSY));
@@ -1480,6 +1520,159 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
EXPECT_EQ(ReturnCode::BUSY, dynamic_cast<const MergeBucketReply&>(*reply).getResult().getResult());
}
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
+ ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
+
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
+
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ std::shared_ptr<api::StorageMessage> fwd_cmd;
+ ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
+ // merge and can therefore NOT be enqueued; it must be bounced immediately.
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ MergeBuilder(document::BucketId(16, 1))
+ .nodes(2, 1, 0).chain(2, 1).unordered(true)
+ .memory_usage(8_Mi).create(),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+
+ // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
+ auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
+ fwd_reply->setResult(ReturnCode(ReturnCode::ABORTED, "node stumbled into a ravine"));
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_reply(
+ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
+ MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
+
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
+
+ // New merge is accepted and forwarded
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()));
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+}
+
+TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
+
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+
+ // Accepting this merge would exceed memory limits. It is sent directly from a distributor and
+ // can therefore be enqueued.
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+}
+
+TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
+ throttler(0).set_max_merge_memory_usage_bytes_locking(5_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
+ _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+}
+
+TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
+ // Our utility function for filling queues uses bucket IDs {16, x} where x is increasing
+ // from 0 to the max pending. Ensure we don't accidentally overlap with the bucket ID we
+ // send for below in the test code.
+ ASSERT_LT(throttler_max_merges_pending(0), 1000);
+
+ throttler(0).set_max_merge_memory_usage_bytes_locking(50_Mi);
+ // Fill up active window on node 0. Note: these merges do not have any associated memory cost.
+ fill_throttler_queue_with_n_commands(0, 0);
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+
+ _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+namespace {
+
+vespalib::HwInfo make_mem_info(uint64_t mem_size) {
+ return {{0, false, false}, {mem_size}, {1}};
+}
+
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_auto_deduced_from_hw_info) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ // Enable auto-deduction of limits
+ cfg_limit.maxUsageBytes = 0;
+
+ cfg_limit.autoLowerBoundBytes = 100'000;
+ cfg_limit.autoUpperBoundBytes = 750'000;
+ cfg_limit.autoPhysMemScaleFactor = 0.5;
+
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 500'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 500'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.75;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 750'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.25;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 250'000);
+
+ // Min-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.05;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 100'000);
+
+ // Max-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.90;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_explicitly) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = 1'234'567;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 1'234'567);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 1'234'567);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_to_unlimited) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = -1;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ // Zero implies infinity
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 0);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 0);
+}
+
// TODO test message queue aborting (use rendezvous functionality--make guard)
} // namespace storage
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index edb13eea5af..63d8eec6dc3 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -40,7 +40,8 @@ struct Fixture {
vdstestlib::DirConfig config{getStandardConfig(true)};
TestServiceLayerApp app;
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()};
+ MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())),
+ app.getComponentRegister(), vespalib::HwInfo()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index dcce3079c68..0d877d33277 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -46,6 +46,38 @@ merge_throttling_policy.min_window_size int default=16
merge_throttling_policy.max_window_size int default=128
merge_throttling_policy.window_size_increment double default=2.0
+## If positive, nodes enforce a soft limit on the estimated amount of memory that
+## can be used by merges touching a particular content node. If a merge arrives
+## to the node that would violate the soft limit, it will be bounced with BUSY.
+## Note that this also counts merges where the node is part of the source-only set,
+## since these use memory when/if data is read from the local node.
+##
+## Semantics:
+## > 0 explicit limit in bytes
+## == 0 limit automatically deduced by content node
+## < 0 unlimited (legacy behavior)
+merge_throttling_memory_limit.max_usage_bytes long default=-1
+
+## If merge_throttling_memory_limit.max_usage_bytes == 0, this factor is used
+## as a multiplier to automatically deduce a memory limit for merges on the
+## content node. Note that the result of this multiplication is capped at both
+## ends by the auto_(lower|upper)_bound_bytes config values.
+##
+## Default: 3% of physical memory
+merge_throttling_memory_limit.auto_phys_mem_scale_factor double default=0.03
+
+## The absolute minimum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 128MiB
+merge_throttling_memory_limit.auto_lower_bound_bytes long default=134217728
+
+## The absolute maximum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 2GiB
+merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648
+
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
## be bounced for this duration. Not allowing further merges helps take
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 7ce034abfee..68f305fe94e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -9,6 +9,7 @@
#include <vespa/storageframework/generic/clock/clock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/stllike/hash_set.h>
#include <array>
#include <vespa/log/bufferedlogger.h>
@@ -121,7 +122,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
}
const lib::ClusterState& clusterState(_bucketSpace->getClusterState());
- std::vector<std::unique_ptr<BucketCopy> > newCopies;
+ std::vector<std::unique_ptr<BucketCopy>> newCopies;
std::vector<MergeMetaData> nodes;
for (uint16_t node : getNodes()) {
@@ -139,6 +140,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
_mnodes.emplace_back(node._nodeIndex, node._sourceOnly);
}
+ const auto estimated_memory_footprint = estimate_merge_memory_footprint_upper_bound(nodes);
+
if (_mnodes.size() > 1) {
auto msg = std::make_shared<api::MergeBucketCommand>(getBucket(), _mnodes,
_manager->operation_context().generate_unique_timestamp(),
@@ -153,6 +156,7 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender)
} else {
msg->set_use_unordered_forwarding(true);
}
+ msg->set_estimated_memory_footprint(estimated_memory_footprint);
LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index);
@@ -367,6 +371,40 @@ bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const
return true;
}
+uint32_t MergeOperation::estimate_merge_memory_footprint_upper_bound(const std::vector<MergeMetaData>& nodes) const noexcept {
+ vespalib::hash_set<uint32_t> seen_checksums;
+ uint32_t worst_case_footprint_across_nodes = 0;
+ uint32_t largest_single_doc_contribution = 0;
+ for (const auto& node : nodes) {
+ if (!seen_checksums.contains(node.checksum())) {
+ seen_checksums.insert(node.checksum());
+ const uint32_t replica_size = node._copy->getUsedFileSize();
+ // We don't know the overlap of document sets across replicas, so we have to assume the
+ // worst and treat the replicas as entirely disjoint. In this case, the _sum_ of all disjoint
+ // replica group footprints gives us the upper bound.
+ // Note: saturate-on-overflow check requires all types to be _unsigned_ to work.
+ if (worst_case_footprint_across_nodes + replica_size >= worst_case_footprint_across_nodes) {
+ worst_case_footprint_across_nodes += replica_size;
+ } else {
+ worst_case_footprint_across_nodes = UINT32_MAX;
+ }
+ // Special case for not bounding single massive doc replica to that of the max
+ // configured bucket size.
+ if (node._copy->getDocumentCount() == 1) {
+ largest_single_doc_contribution = std::max(replica_size, largest_single_doc_contribution);
+ }
+ }
+ }
+ // We know that simply adding up replica sizes is likely to massively over-count in the common
+ // case (due to the intersection set between replicas rarely being empty), so we cap it by the
+ // max expected merge chunk size (which is expected to be configured equal to the split limit).
+ // _Except_ if we have single-doc replicas, as these are known not to overlap, and we know that
+ // the worst case must be the max of the chunk size and the biggest single doc size.
+ const uint32_t expected_max_merge_chunk_size = _manager->operation_context().distributor_config().getSplitSize();
+ return std::max(std::min(worst_case_footprint_across_nodes, expected_max_merge_chunk_size),
+ largest_single_doc_contribution);
+}
+
MergeBucketMetricSet*
MergeOperation::get_merge_metrics()
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index ff21e3d1594..8f54aea33be 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -63,8 +63,9 @@ private:
void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState,
DistributorStripeMessageSender& sender);
- bool is_global_bucket_merge() const noexcept;
- bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
+ [[nodiscard]] bool is_global_bucket_merge() const noexcept;
+ [[nodiscard]] bool all_involved_nodes_support_unordered_merge_chaining() const noexcept;
+ [[nodiscard]] uint32_t estimate_merge_memory_footprint_upper_bound(const std::vector<MergeMetaData>& nodes) const noexcept;
MergeBucketMetricSet* get_merge_metrics();
};
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 4cc2a7a89ab..82bd5ff7ace 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -29,7 +29,7 @@ namespace {
struct NodeComparator {
bool operator()(const api::MergeBucketCommand::Node& a,
- const api::MergeBucketCommand::Node& b) const
+ const api::MergeBucketCommand::Node& b) const noexcept
{
return a.index < b.index;
}
@@ -41,6 +41,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState()
: _cmd(),
_cmdString(),
_clusterStateVersion(0),
+ _estimated_memory_usage(0),
_inCycle(false),
_executingLocally(false),
_unwinding(false),
@@ -52,6 +53,7 @@ MergeThrottler::ChainedMergeState::ChainedMergeState(const api::StorageMessage::
: _cmd(cmd),
_cmdString(cmd->toString()),
_clusterStateVersion(static_cast<const api::MergeBucketCommand&>(*cmd).getClusterStateVersion()),
+ _estimated_memory_usage(static_cast<const api::MergeBucketCommand&>(*cmd).estimated_memory_footprint()),
_inCycle(false),
_executingLocally(executing),
_unwinding(false),
@@ -65,6 +67,9 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this),
queueSize("queuesize", {}, "Length of merge queue", this),
active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
+ estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the "
+ "memory usage of the merges currently in the active window", this),
+ merge_memory_limit("merge_memory_limit", {}, "The active soft limit for memory used by merge operations on this node", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -180,9 +185,11 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
MergeThrottler::MergeThrottler(
const StorServerConfig& bootstrap_config,
- StorageComponentRegister& compReg)
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info)
: StorageLink("Merge Throttler"),
framework::HtmlStatusReporter("merges", "Merge Throttler"),
+ _hw_info(hw_info),
_merges(),
_queue(),
_maxQueueSize(1024),
@@ -191,11 +198,13 @@ MergeThrottler::MergeThrottler(
_messageLock(),
_stateLock(),
_metrics(std::make_unique<Metrics>()),
- _component(compReg, "mergethrottler"),
+ _component(comp_reg, "mergethrottler"),
_thread(),
_rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
+ _active_merge_memory_used_bytes(0),
+ _max_merge_memory_usage_bytes(0), // 0 ==> unlimited
_use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
@@ -244,6 +253,14 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
_disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
+ if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) {
+ _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes);
+ } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) {
+ _max_merge_memory_usage_bytes = deduced_memory_limit(new_config);
+ } else {
+ _max_merge_memory_usage_bytes = 0; // Implies unlimited
+ }
+ _metrics->merge_memory_limit.set(static_cast<int64_t>(_max_merge_memory_usage_bytes));
}
MergeThrottler::~MergeThrottler()
@@ -373,16 +390,19 @@ MergeThrottler::forwardCommandToNode(
fwdMerge->setPriority(mergeCmd.getPriority());
fwdMerge->setTimeout(mergeCmd.getTimeout());
fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding());
+ fwdMerge->set_estimated_memory_footprint(mergeCmd.estimated_memory_footprint());
msgGuard.sendUp(fwdMerge);
}
void
MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter)
{
- LOG(debug, "Removed merge for %s from internal state",
- mergeIter->first.toString().c_str());
+ LOG(debug, "Removed merge for %s from internal state", mergeIter->first.toString().c_str());
+ assert(_active_merge_memory_used_bytes >= mergeIter->second._estimated_memory_usage);
+ _active_merge_memory_used_bytes -= mergeIter->second._estimated_memory_usage;
_merges.erase(mergeIter);
update_active_merge_window_size_metric();
+ update_active_merge_memory_usage_metric();
}
api::StorageMessage::SP
@@ -714,6 +734,21 @@ bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketComma
return !_use_dynamic_throttling;
}
+bool MergeThrottler::accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept {
+ // Soft-limit on expected memory usage, but always let at least one merge into the active window.
+ if ((_max_merge_memory_usage_bytes > 0) && !_merges.empty()) {
+ size_t future_usage = _active_merge_memory_used_bytes + cmd.estimated_memory_footprint();
+ if (future_usage > _max_merge_memory_usage_bytes) {
+ LOG(spam, "Adding merge with memory footprint %u would exceed node soft limit of %zu. "
+ "Current memory usage is %zu, future usage would have been %zu",
+ cmd.estimated_memory_footprint(), _max_merge_memory_usage_bytes,
+ _active_merge_memory_used_bytes, future_usage);
+ return false;
+ }
+ }
+ return true;
+}
+
bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low
@@ -761,7 +796,10 @@ MergeThrottler::handleMessageDown(
if (isMergeAlreadyKnown(msg)) {
processCycledMergeCommand(msg, msgGuard);
- } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) {
+ } else if (accepting_merge_is_within_memory_limits(mergeCmd)
+ && (canProcessNewMerge()
+ || allow_merge_despite_full_window(mergeCmd)))
+ {
processNewMergeCommand(msg, msgGuard);
} else if (may_allow_into_queue(mergeCmd)) {
enqueue_merge_for_later_processing(msg, msgGuard);
@@ -864,9 +902,10 @@ MergeThrottler::processNewMergeCommand(
assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
update_active_merge_window_size_metric();
+ _active_merge_memory_used_bytes += mergeCmd.estimated_memory_footprint();
+ update_active_merge_memory_usage_metric();
- LOG(debug, "Added merge %s to internal state",
- mergeCmd.toString().c_str());
+ LOG(debug, "Added merge %s to internal state", mergeCmd.toString().c_str());
DummyMbusRequest dummyMsg;
_throttlePolicy->processMessage(dummyMsg);
@@ -889,7 +928,7 @@ MergeThrottler::processNewMergeCommand(
} else {
if (!nodeSeq.isLastNode()) {
// When we're not the last node and haven't seen the merge before,
- // we cannot possible execute the merge yet. Forward to next.
+ // we cannot possibly execute the merge yet. Forward to next.
uint16_t nextNodeInChain = nodeSeq.getNextNodeInChain();
LOG(debug, "Forwarding merge %s to storage node %u",
mergeCmd.toString().c_str(), nextNodeInChain);
@@ -1291,17 +1330,52 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
-MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept {
+MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept {
std::lock_guard lock(_stateLock);
_disable_queue_limits_for_chained_merges = disable_limits;
}
void
+MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept {
+ std::lock_guard lock(_stateLock);
+ _max_merge_memory_usage_bytes = max_memory_bytes;
+}
+
+uint32_t
+MergeThrottler::max_merge_memory_usage_bytes_locking() const noexcept {
+ std::lock_guard lock(_stateLock);
+ return _max_merge_memory_usage_bytes;
+}
+
+void
+MergeThrottler::set_hw_info_locking(const vespalib::HwInfo& hw_info) {
+ std::lock_guard lock(_stateLock);
+ _hw_info = hw_info;
+}
+
+size_t
+MergeThrottler::deduced_memory_limit(const StorServerConfig& cfg) const noexcept {
+ const auto min_limit = static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoLowerBoundBytes, 1L));
+ const auto max_limit = std::max(static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoUpperBoundBytes, 1L)), min_limit);
+ const auto mem_scale_factor = std::max(cfg.mergeThrottlingMemoryLimit.autoPhysMemScaleFactor, 0.0);
+
+ const auto node_mem = static_cast<double>(_hw_info.memory().sizeBytes());
+ const auto scaled_mem = static_cast<size_t>(node_mem * mem_scale_factor);
+
+ return std::min(std::max(scaled_mem, min_limit), max_limit);
+}
+
+void
MergeThrottler::update_active_merge_window_size_metric() noexcept {
_metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
}
void
+MergeThrottler::update_active_merge_memory_usage_metric() noexcept {
+ _metrics->estimated_merge_memory_usage.set(static_cast<int64_t>(_active_merge_memory_used_bytes));
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 5362c2f6df8..a5559c159bf 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -1,26 +1,24 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::MergeThrottler
- * @ingroup storageserver
- *
- * @brief Throttler and forwarder of merge commands
+ * Throttler and forwarder of merge commands
*/
#pragma once
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/common/message_guard.h>
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storage/common/storagecomponent.h>
-#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
-#include <vespa/storageframework/generic/thread/runnable.h>
-#include <vespa/storageapi/message/bucket.h>
+#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/document/bucket/bucket.h>
+#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/metrics/metrictimer.h>
#include <vespa/metrics/summetric.h>
-#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/valuemetric.h>
-#include <vespa/metrics/metrictimer.h>
-#include <vespa/config/helper/ifetchercallback.h>
+#include <vespa/storage/common/message_guard.h>
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
+#include <vespa/storageframework/generic/thread/runnable.h>
+#include <vespa/vespalib/util/hw_info.h>
#include <chrono>
@@ -71,6 +69,8 @@ public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongValueMetric queueSize;
metrics::LongValueMetric active_window_size;
+ metrics::LongValueMetric estimated_merge_memory_usage;
+ metrics::LongValueMetric merge_memory_limit;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -113,6 +113,7 @@ private:
api::StorageMessage::SP _cmd;
std::string _cmdString; // For being able to print message even when we don't own it
uint64_t _clusterStateVersion;
+ uint32_t _estimated_memory_usage;
bool _inCycle;
bool _executingLocally;
bool _unwinding;
@@ -154,9 +155,7 @@ private:
// Use a set rather than a priority_queue, since we want to be
// able to iterate over the collection during status rendering
- using MergePriorityQueue = std::set<
- StablePriorityOrderingWrapper<api::StorageMessage::SP>
- >;
+ using MergePriorityQueue = std::set<StablePriorityOrderingWrapper<api::StorageMessage::SP>>;
enum class RendezvousState {
NONE,
@@ -165,32 +164,37 @@ private:
RELEASED
};
- ActiveMergeMap _merges;
- MergePriorityQueue _queue;
- size_t _maxQueueSize;
- std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
- uint64_t _queueSequence; // TODO: move into a stable priority queue class
- mutable std::mutex _messageLock;
- std::condition_variable _messageCond;
- mutable std::mutex _stateLock;
+ vespalib::HwInfo _hw_info;
+ ActiveMergeMap _merges;
+ MergePriorityQueue _queue;
+ size_t _maxQueueSize;
+ std::unique_ptr<mbus::DynamicThrottlePolicy> _throttlePolicy;
+ uint64_t _queueSequence; // TODO: move into a stable priority queue class
+ mutable std::mutex _messageLock;
+ std::condition_variable _messageCond;
+ mutable std::mutex _stateLock;
// Messages pending to be processed by the worker thread
- std::vector<api::StorageMessage::SP> _messagesDown;
- std::vector<api::StorageMessage::SP> _messagesUp;
- std::unique_ptr<Metrics> _metrics;
- StorageComponent _component;
- std::unique_ptr<framework::Thread> _thread;
- RendezvousState _rendezvous;
+ std::vector<api::StorageMessage::SP> _messagesDown;
+ std::vector<api::StorageMessage::SP> _messagesUp;
+ std::unique_ptr<Metrics> _metrics;
+ StorageComponent _component;
+ std::unique_ptr<framework::Thread> _thread;
+ RendezvousState _rendezvous;
mutable std::chrono::steady_clock::time_point _throttle_until_time;
- std::chrono::steady_clock::duration _backpressure_duration;
- bool _use_dynamic_throttling;
- bool _disable_queue_limits_for_chained_merges;
- bool _closing;
+ std::chrono::steady_clock::duration _backpressure_duration;
+ size_t _active_merge_memory_used_bytes;
+ size_t _max_merge_memory_usage_bytes;
+ bool _use_dynamic_throttling;
+ bool _disable_queue_limits_for_chained_merges;
+ bool _closing;
public:
/**
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&);
+ MergeThrottler(const StorServerConfig& bootstrap_config,
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info);
~MergeThrottler() override;
/** Implements document::Runnable::run */
@@ -223,7 +227,10 @@ public:
// For unit testing only
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
- void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
+ void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept;
+ void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept;
+ [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept;
+ void set_hw_info_locking(const vespalib::HwInfo& hw_info);
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -363,6 +370,7 @@ private:
[[nodiscard]] bool backpressure_mode_active_no_lock() const;
void backpressure_bounce_all_queued_merges(MessageGuard& guard);
[[nodiscard]] bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) const noexcept;
+ [[nodiscard]] bool accepting_merge_is_within_memory_limits(const api::MergeBucketCommand& cmd) const noexcept;
[[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept;
void sendReply(const api::MergeBucketCommand& cmd,
@@ -404,7 +412,10 @@ private:
void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
+ [[nodiscard]] size_t deduced_memory_limit(const StorServerConfig& cfg) const noexcept;
+
void update_active_merge_window_size_metric() noexcept;
+ void update_active_merge_memory_usage_metric() noexcept;
// const function, but metrics are mutable
void updateOperationMetrics(
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 0cce2c27e95..7da75225b6c 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -30,8 +30,9 @@ ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(Ser
ServiceLayerNode::ServiceLayerBootstrapConfigs&
ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default;
-ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
@@ -41,6 +42,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
_context(context),
_persistenceProvider(persistenceProvider),
_externalVisitors(externalVisitors),
+ _hw_info(hw_info),
_persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)),
_visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)),
_filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)),
@@ -172,7 +174,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config());
_bouncer = bouncer.get();
builder.add(std::move(bouncer));
- auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg);
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg, _hw_info);
_merge_throttler = merge_throttler_up.get();
builder.add(std::move(merge_throttler_up));
auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index ae39bb0805e..bea09a1c9ce 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -12,6 +12,7 @@
#include <vespa/storage/common/visitorfactory.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storage/visiting/visitormessagesessionfactory.h>
+#include <vespa/vespalib/util/hw_info.h>
namespace storage {
@@ -39,6 +40,7 @@ private:
ServiceLayerNodeContext& _context;
spi::PersistenceProvider& _persistenceProvider;
VisitorFactory::Map _externalVisitors;
+ vespalib::HwInfo _hw_info;
std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config;
std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config;
std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config;
@@ -66,8 +68,9 @@ public:
ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept;
};
- ServiceLayerNode(const config::ConfigUri & configUri,
+ ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
index 850b5db5c98..a32fbc3e4de 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto
@@ -33,12 +33,13 @@ message MergeNode {
}
message MergeBucketRequest {
- Bucket bucket = 1;
- uint32 cluster_state_version = 2;
- uint64 max_timestamp = 3;
- repeated MergeNode nodes = 4;
- repeated uint32 node_chain = 5;
- bool unordered_forwarding = 6;
+ Bucket bucket = 1;
+ uint32 cluster_state_version = 2;
+ uint64 max_timestamp = 3;
+ repeated MergeNode nodes = 4;
+ repeated uint32 node_chain = 5;
+ bool unordered_forwarding = 6;
+ uint32 estimated_memory_footprint = 7;
}
message MergeBucketResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index af62ec2b418..efbe8c9b42d 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -801,6 +801,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand&
req.set_max_timestamp(msg.getMaxTimestamp());
req.set_cluster_state_version(msg.getClusterStateVersion());
req.set_unordered_forwarding(msg.use_unordered_forwarding());
+ req.set_estimated_memory_footprint(msg.estimated_memory_footprint());
for (uint16_t chain_node : msg.getChain()) {
req.add_node_chain(chain_node);
}
@@ -823,6 +824,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf&
}
cmd->setChain(std::move(chain));
cmd->set_use_unordered_forwarding(req.unordered_forwarding());
+ cmd->set_estimated_memory_footprint(req.estimated_memory_footprint());
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/bucket.cpp b/storage/src/vespa/storageapi/message/bucket.cpp
index 49295f54891..499d2f4abe2 100644
--- a/storage/src/vespa/storageapi/message/bucket.cpp
+++ b/storage/src/vespa/storageapi/message/bucket.cpp
@@ -107,6 +107,7 @@ MergeBucketCommand::MergeBucketCommand(
_nodes(nodes),
_maxTimestamp(maxTimestamp),
_clusterStateVersion(clusterStateVersion),
+ _estimated_memory_footprint(0),
_chain(chain),
_use_unordered_forwarding(false)
{}
@@ -132,6 +133,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in
if (_use_unordered_forwarding) {
out << " (unordered forwarding)";
}
+ if (_estimated_memory_footprint > 0) {
+ out << ", estimated memory footprint: " << _estimated_memory_footprint << " bytes";
+ }
out << ", reasons to start: " << _reason;
out << ")";
if (verbose) {
diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h
index d1fa00619ae..4aa2ff8b0c1 100644
--- a/storage/src/vespa/storageapi/message/bucket.h
+++ b/storage/src/vespa/storageapi/message/bucket.h
@@ -118,6 +118,7 @@ private:
std::vector<Node> _nodes;
Timestamp _maxTimestamp;
uint32_t _clusterStateVersion;
+ uint32_t _estimated_memory_footprint;
std::vector<uint16_t> _chain;
bool _use_unordered_forwarding;
@@ -140,6 +141,12 @@ public:
}
[[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
[[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); }
+ void set_estimated_memory_footprint(uint32_t footprint_bytes) noexcept {
+ _estimated_memory_footprint = footprint_bytes;
+ }
+ [[nodiscard]] uint32_t estimated_memory_footprint() const noexcept {
+ return _estimated_memory_footprint;
+ }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};
diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
index 8940c2a320e..245afb1c774 100644
--- a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
@@ -7,7 +7,7 @@ namespace storage {
// DummyServiceLayerProcess implementation
DummyServiceLayerProcess::DummyServiceLayerProcess(const config::ConfigUri & configUri)
- : ServiceLayerProcess(configUri)
+ : ServiceLayerProcess(configUri, vespalib::HwInfo())
{
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index bb284bfc108..ebf320352eb 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -31,7 +31,7 @@ bucket_db_options_from_config(const config::ConfigUri& config_uri) {
}
-ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri)
+ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri, const vespalib::HwInfo& hw_info)
: Process(configUri),
_externalVisitors(),
_persistence_cfg_handle(),
@@ -39,6 +39,7 @@ ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri)
_filestor_cfg_handle(),
_node(),
_storage_chain_builder(),
+ _hw_info(hw_info),
_context(std::make_unique<framework::defaultimplementation::RealClock>(),
bucket_db_options_from_config(configUri))
{
@@ -106,7 +107,8 @@ ServiceLayerProcess::createNode()
sbc.visitor_cfg = _visitor_cfg_handle->getConfig();
sbc.filestor_cfg = _filestor_cfg_handle->getConfig();
- _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors);
+ _node = std::make_unique<ServiceLayerNode>(_configUri, _context, _hw_info, std::move(sbc),
+ *this, getProvider(), _externalVisitors);
if (_storage_chain_builder) {
_node->set_storage_chain_builder(std::move(_storage_chain_builder));
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index dcc56f373c4..add5a38ca9d 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -7,6 +7,7 @@
#include <vespa/storage/common/visitorfactory.h>
#include <vespa/storage/storageserver/servicelayernodecontext.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/vespalib/util/hw_info.h>
namespace config { class ConfigUri; }
@@ -29,14 +30,15 @@ private:
std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle;
std::unique_ptr<config::ConfigHandle<StorFilestorConfig>> _filestor_cfg_handle;
- std::unique_ptr<ServiceLayerNode> _node;
+ std::unique_ptr<ServiceLayerNode> _node;
std::unique_ptr<IStorageChainBuilder> _storage_chain_builder;
protected:
+ vespalib::HwInfo _hw_info;
ServiceLayerNodeContext _context;
public:
- explicit ServiceLayerProcess(const config::ConfigUri & configUri);
+ ServiceLayerProcess(const config::ConfigUri & configUri, const vespalib::HwInfo& hw_info);
~ServiceLayerProcess() override;
void shutdown() override;