summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-31 15:30:31 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-11-02 11:17:24 +0000
commit10abf9db2a47091e151ca903e3675fae29c4ed09 (patch)
treeb10feaaf88b3e5acd09dba87ed723a19befa9a31 /storage
parentc87ae9ba1b1f1745191e75edefcaf436af3ab0c6 (diff)
Wire HwInfo into MergeThrottler and use for auto-deduction of memory limits
Add config for min/max capping of deduced limit, as well as a scaling factor based on the memory available to the process. Defaults have been chosen based on empirical observations over many years, but having these as config means we can tune things live if it should ever be required.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp129
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp3
-rw-r--r--storage/src/vespa/storage/config/stor-server.def24
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp47
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h40
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h5
7 files changed, 196 insertions, 58 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 48e0ab186f2..6f80ffe0727 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -31,9 +31,16 @@ namespace storage {
namespace {
using StorServerConfig = vespa::config::content::core::StorServerConfig;
+using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder;
vespalib::string _storage("storage");
+std::unique_ptr<StorServerConfig> default_server_config() {
+ vdstestlib::DirConfig dir_config(getStandardConfig(true));
+ auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
+ return config_from<StorServerConfig>(cfg_uri);
+}
+
struct MergeBuilder {
document::BucketId _bucket;
api::Timestamp _maxTimestamp;
@@ -157,6 +164,11 @@ struct MergeThrottlerTest : Test {
void SetUp() override;
void TearDown() override;
+ MergeThrottler& throttler(size_t idx) noexcept {
+ assert(idx < _throttlers.size());
+ return *_throttlers[idx];
+ }
+
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
void send_and_expect_reply(
@@ -186,17 +198,14 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- vdstestlib::DirConfig dir_config(getStandardConfig(true));
- auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
- auto config = config_from<StorServerConfig>(cfg_uri);
-
+ auto config = default_server_config();
for (int i = 0; i < _storageNodeCount; ++i) {
auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1"));
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
auto* bottom = new DummyStorageLink;
@@ -1228,7 +1237,7 @@ void
MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
- _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
+ _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits);
size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1514,15 +1523,15 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
- _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
- ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
std::shared_ptr<api::StorageMessage> fwd_cmd;
ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
// Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
// merge and can therefore NOT be enqueued; it must be bounced immediately.
@@ -1532,7 +1541,7 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_
.memory_usage(8_Mi).create(),
MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
// Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
@@ -1542,38 +1551,38 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_
std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
- ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
- // New merge is accepted
- _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
- _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+ // New merge is accepted and forwarded
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
}
TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
- _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
// Accepting this merge would exceed memory limits. It is sent directly from a distributor and
// can therefore be enqueued.
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
- waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
}
TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
- _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(5_Mi);
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
}
TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
@@ -1582,19 +1591,87 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
// send for below in the test code.
ASSERT_LT(throttler_max_merges_pending(0), 1000);
- _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(50_Mi);
// Fill up active window on node 0. Note: these merges do not have any associated memory cost.
fill_throttler_queue_with_n_commands(0, 0);
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
- waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+namespace {
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+vespalib::HwInfo make_mem_info(uint64_t mem_size) {
+ return {{0, false, false}, {mem_size}, {1}};
}
-// TODO define and test auto-deduced max memory usage
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_auto_deduced_from_hw_info) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ // Enable auto-deduction of limits
+ cfg_limit.maxUsageBytes = 0;
+
+ cfg_limit.autoLowerBoundBytes = 100'000;
+ cfg_limit.autoUpperBoundBytes = 750'000;
+ cfg_limit.autoPhysMemScaleFactor = 0.5;
+
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 500'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 500'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.75;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 750'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.25;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 250'000);
+
+ // Min-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.05;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 100'000);
+
+ // Max-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.90;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_explicitly) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = 1'234'567;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 1'234'567);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 1'234'567);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_to_unlimited) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = -1;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ // Zero implies infinity
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 0);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 0);
+}
// TODO test message queue aborting (use rendezvous functionality--make guard)
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index edb13eea5af..63d8eec6dc3 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -40,7 +40,8 @@ struct Fixture {
vdstestlib::DirConfig config{getStandardConfig(true)};
TestServiceLayerApp app;
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()};
+ MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())),
+ app.getComponentRegister(), vespalib::HwInfo()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 2f34d5372ab..0d877d33277 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -54,9 +54,29 @@ merge_throttling_policy.window_size_increment double default=2.0
##
## Semantics:
## > 0 explicit limit in bytes
-## == 0 limit automatically derived by content node
+## == 0 limit automatically deduced by content node
## < 0 unlimited (legacy behavior)
-max_merge_memory_usage_bytes long default=-1
+merge_throttling_memory_limit.max_usage_bytes long default=-1
+
+## If merge_throttling_memory_limit.max_usage_bytes == 0, this factor is used
+## as a multiplier to automatically deduce a memory limit for merges on the
+## content node. Note that the result of this multiplication is capped at both
+## ends by the auto_(lower|upper)_bound_bytes config values.
+##
+## Default: 3% of physical memory
+merge_throttling_memory_limit.auto_phys_mem_scale_factor double default=0.03
+
+## The absolute minimum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 128MiB
+merge_throttling_memory_limit.auto_lower_bound_bytes long default=134217728
+
+## The absolute maximum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 2GiB
+merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 4646604a980..82bd5ff7ace 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -69,6 +69,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the "
"memory usage of the merges currently in the active window", this),
+ merge_memory_limit("merge_memory_limit", {}, "The active soft limit for memory used by merge operations on this node", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -184,9 +185,11 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
MergeThrottler::MergeThrottler(
const StorServerConfig& bootstrap_config,
- StorageComponentRegister& compReg)
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info)
: StorageLink("Merge Throttler"),
framework::HtmlStatusReporter("merges", "Merge Throttler"),
+ _hw_info(hw_info),
_merges(),
_queue(),
_maxQueueSize(1024),
@@ -195,13 +198,13 @@ MergeThrottler::MergeThrottler(
_messageLock(),
_stateLock(),
_metrics(std::make_unique<Metrics>()),
- _component(compReg, "mergethrottler"),
+ _component(comp_reg, "mergethrottler"),
_thread(),
_rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
_active_merge_memory_used_bytes(0),
- _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited
+ _max_merge_memory_usage_bytes(0), // 0 ==> unlimited
_use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
@@ -250,12 +253,14 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
_disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
- if (new_config.maxMergeMemoryUsageBytes > 0) {
- _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes);
+ if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) {
+ _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes);
+ } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) {
+ _max_merge_memory_usage_bytes = deduced_memory_limit(new_config);
} else {
- _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits
+ _max_merge_memory_usage_bytes = 0; // Implies unlimited
}
-
+ _metrics->merge_memory_limit.set(static_cast<int64_t>(_max_merge_memory_usage_bytes));
}
MergeThrottler::~MergeThrottler()
@@ -1325,17 +1330,41 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
-MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept {
+MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept {
std::lock_guard lock(_stateLock);
_disable_queue_limits_for_chained_merges = disable_limits;
}
void
-MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept {
+MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept {
std::lock_guard lock(_stateLock);
_max_merge_memory_usage_bytes = max_memory_bytes;
}
+uint32_t
+MergeThrottler::max_merge_memory_usage_bytes_locking() const noexcept {
+ std::lock_guard lock(_stateLock);
+ return _max_merge_memory_usage_bytes;
+}
+
+void
+MergeThrottler::set_hw_info_locking(const vespalib::HwInfo& hw_info) {
+ std::lock_guard lock(_stateLock);
+ _hw_info = hw_info;
+}
+
+size_t
+MergeThrottler::deduced_memory_limit(const StorServerConfig& cfg) const noexcept {
+ const auto min_limit = static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoLowerBoundBytes, 1L));
+ const auto max_limit = std::max(static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoUpperBoundBytes, 1L)), min_limit);
+ const auto mem_scale_factor = std::max(cfg.mergeThrottlingMemoryLimit.autoPhysMemScaleFactor, 0.0);
+
+ const auto node_mem = static_cast<double>(_hw_info.memory().sizeBytes());
+ const auto scaled_mem = static_cast<size_t>(node_mem * mem_scale_factor);
+
+ return std::min(std::max(scaled_mem, min_limit), max_limit);
+}
+
void
MergeThrottler::update_active_merge_window_size_metric() noexcept {
_metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 8f6c4d62f68..a5559c159bf 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -1,26 +1,24 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::MergeThrottler
- * @ingroup storageserver
- *
- * @brief Throttler and forwarder of merge commands
+ * Throttler and forwarder of merge commands
*/
#pragma once
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/common/message_guard.h>
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storage/common/storagecomponent.h>
-#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
-#include <vespa/storageframework/generic/thread/runnable.h>
-#include <vespa/storageapi/message/bucket.h>
+#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/document/bucket/bucket.h>
+#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/metrics/metrictimer.h>
#include <vespa/metrics/summetric.h>
-#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/valuemetric.h>
-#include <vespa/metrics/metrictimer.h>
-#include <vespa/config/helper/ifetchercallback.h>
+#include <vespa/storage/common/message_guard.h>
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
+#include <vespa/storageframework/generic/thread/runnable.h>
+#include <vespa/vespalib/util/hw_info.h>
#include <chrono>
@@ -72,6 +70,7 @@ public:
metrics::LongValueMetric queueSize;
metrics::LongValueMetric active_window_size;
metrics::LongValueMetric estimated_merge_memory_usage;
+ metrics::LongValueMetric merge_memory_limit;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -165,6 +164,7 @@ private:
RELEASED
};
+ vespalib::HwInfo _hw_info;
ActiveMergeMap _merges;
MergePriorityQueue _queue;
size_t _maxQueueSize;
@@ -192,7 +192,9 @@ public:
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&);
+ MergeThrottler(const StorServerConfig& bootstrap_config,
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info);
~MergeThrottler() override;
/** Implements document::Runnable::run */
@@ -225,8 +227,10 @@ public:
// For unit testing only
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
- void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
- void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept;
+ void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept;
+ void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept;
+ [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept;
+ void set_hw_info_locking(const vespalib::HwInfo& hw_info);
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -408,6 +412,8 @@ private:
void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
+ [[nodiscard]] size_t deduced_memory_limit(const StorServerConfig& cfg) const noexcept;
+
void update_active_merge_window_size_metric() noexcept;
void update_active_merge_memory_usage_metric() noexcept;
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 0cce2c27e95..7da75225b6c 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -30,8 +30,9 @@ ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(Ser
ServiceLayerNode::ServiceLayerBootstrapConfigs&
ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default;
-ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
@@ -41,6 +42,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
_context(context),
_persistenceProvider(persistenceProvider),
_externalVisitors(externalVisitors),
+ _hw_info(hw_info),
_persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)),
_visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)),
_filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)),
@@ -172,7 +174,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config());
_bouncer = bouncer.get();
builder.add(std::move(bouncer));
- auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg);
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg, _hw_info);
_merge_throttler = merge_throttler_up.get();
builder.add(std::move(merge_throttler_up));
auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index ae39bb0805e..bea09a1c9ce 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -12,6 +12,7 @@
#include <vespa/storage/common/visitorfactory.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storage/visiting/visitormessagesessionfactory.h>
+#include <vespa/vespalib/util/hw_info.h>
namespace storage {
@@ -39,6 +40,7 @@ private:
ServiceLayerNodeContext& _context;
spi::PersistenceProvider& _persistenceProvider;
VisitorFactory::Map _externalVisitors;
+ vespalib::HwInfo _hw_info;
std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config;
std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config;
std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config;
@@ -66,8 +68,9 @@ public:
ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept;
};
- ServiceLayerNode(const config::ConfigUri & configUri,
+ ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,