summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/apps/proton/proton.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp2
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp129
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp3
-rw-r--r--storage/src/vespa/storage/config/stor-server.def24
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp47
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h40
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h5
-rw-r--r--storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp2
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp6
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h6
12 files changed, 213 insertions, 69 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 129091606b3..4c20c40b406 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -109,7 +109,8 @@ class ProtonServiceLayerProcess : public storage::ServiceLayerProcess {
public:
ProtonServiceLayerProcess(const config::ConfigUri & configUri,
proton::Proton & proton, FNET_Transport& transport,
- const vespalib::string& file_distributor_connection_spec);
+ const vespalib::string& file_distributor_connection_spec,
+ const vespalib::HwInfo& hw_info);
~ProtonServiceLayerProcess() override { shutdown(); }
void shutdown() override;
@@ -130,8 +131,9 @@ public:
ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri,
proton::Proton & proton, FNET_Transport& transport,
- const vespalib::string& file_distributor_connection_spec)
- : ServiceLayerProcess(configUri),
+ const vespalib::string& file_distributor_connection_spec,
+ const vespalib::HwInfo& hw_info)
+ : ServiceLayerProcess(configUri, hw_info),
_proton(proton),
_transport(transport),
_file_distributor_connection_spec(file_distributor_connection_spec),
@@ -259,18 +261,18 @@ App::startAndRun(FNET_Transport & transport, int argc, char **argv) {
proton.init(configSnapshot);
}
vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec;
- configSnapshot.reset();
std::unique_ptr<ProtonServiceLayerProcess> spiProton;
if ( ! params.serviceidentity.empty()) {
spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport,
- file_distributor_connection_spec);
+ file_distributor_connection_spec, configSnapshot->getHwInfo());
spiProton->setupConfig(subscribeTimeout);
spiProton->createNode();
EV_STARTED("servicelayer");
} else {
proton.getMetricManager().init(identityUri);
}
+ configSnapshot.reset();
EV_STARTED("proton");
while (!(SIG::INT.check() || SIG::TERM.check() || (spiProton && spiProton->getNode().attemptedStopped()))) {
std::this_thread::sleep_for(1000ms);
diff --git a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
index 9a0a2968c69..808747034ac 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/bm_node.cpp
@@ -243,7 +243,7 @@ public:
MyServiceLayerProcess::MyServiceLayerProcess(const config::ConfigUri& configUri,
PersistenceProvider& provider,
std::unique_ptr<storage::IStorageChainBuilder> chain_builder)
- : ServiceLayerProcess(configUri),
+ : ServiceLayerProcess(configUri, vespalib::HwInfo()),
_provider(provider)
{
if (chain_builder) {
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 48e0ab186f2..6f80ffe0727 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -31,9 +31,16 @@ namespace storage {
namespace {
using StorServerConfig = vespa::config::content::core::StorServerConfig;
+using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder;
vespalib::string _storage("storage");
+std::unique_ptr<StorServerConfig> default_server_config() {
+ vdstestlib::DirConfig dir_config(getStandardConfig(true));
+ auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
+ return config_from<StorServerConfig>(cfg_uri);
+}
+
struct MergeBuilder {
document::BucketId _bucket;
api::Timestamp _maxTimestamp;
@@ -157,6 +164,11 @@ struct MergeThrottlerTest : Test {
void SetUp() override;
void TearDown() override;
+ MergeThrottler& throttler(size_t idx) noexcept {
+ assert(idx < _throttlers.size());
+ return *_throttlers[idx];
+ }
+
api::MergeBucketCommand::SP sendMerge(const MergeBuilder&);
void send_and_expect_reply(
@@ -186,17 +198,14 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- vdstestlib::DirConfig dir_config(getStandardConfig(true));
- auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
- auto config = config_from<StorServerConfig>(cfg_uri);
-
+ auto config = default_server_config();
for (int i = 0; i < _storageNodeCount; ++i) {
auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1"));
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- auto* throttler = new MergeThrottler(*config, server->getComponentRegister());
+ auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
auto* bottom = new DummyStorageLink;
@@ -1228,7 +1237,7 @@ void
MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd)
{
// Note: uses node with index 1 to not be the first node in chain
- _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits);
+ _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits);
size_t max_pending = throttler_max_merges_pending(1);
size_t max_enqueued = _throttlers[1]->getMaxQueueSize();
for (size_t i = 0; i < max_pending + max_enqueued; ++i) {
@@ -1514,15 +1523,15 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) {
TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) {
ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself
- _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
- ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
std::shared_ptr<api::StorageMessage> fwd_cmd;
ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding(
MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
// Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered
// merge and can therefore NOT be enqueued; it must be bounced immediately.
@@ -1532,7 +1541,7 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_
.memory_usage(8_Mi).create(),
MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
// Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in.
auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply();
@@ -1542,38 +1551,38 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_
std::shared_ptr<api::StorageReply>(std::move(fwd_reply)),
MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge
- ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0);
+ ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0);
- // New merge is accepted
- _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create());
- _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1
+ // New merge is accepted and forwarded
+ ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
+ MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi);
}
TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) {
- _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi);
ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding(
MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create()));
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi);
// Accepting this merge would exceed memory limits. It is sent directly from a distributor and
// can therefore be enqueued.
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create());
- waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged
}
TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) {
- _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(5_Mi);
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create());
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi);
}
TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
@@ -1582,19 +1591,87 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) {
// send for below in the test code.
ASSERT_LT(throttler_max_merges_pending(0), 1000);
- _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi);
+ throttler(0).set_max_merge_memory_usage_bytes_locking(50_Mi);
// Fill up active window on node 0. Note: these merges do not have any associated memory cost.
fill_throttler_queue_with_n_commands(0, 0);
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
_topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create());
- waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue
+ waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue
+
+ EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+}
+
+namespace {
- EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi);
+vespalib::HwInfo make_mem_info(uint64_t mem_size) {
+ return {{0, false, false}, {mem_size}, {1}};
}
-// TODO define and test auto-deduced max memory usage
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_auto_deduced_from_hw_info) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ // Enable auto-deduction of limits
+ cfg_limit.maxUsageBytes = 0;
+
+ cfg_limit.autoLowerBoundBytes = 100'000;
+ cfg_limit.autoUpperBoundBytes = 750'000;
+ cfg_limit.autoPhysMemScaleFactor = 0.5;
+
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 500'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 500'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.75;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 750'000);
+
+ cfg_limit.autoPhysMemScaleFactor = 0.25;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 250'000);
+
+ // Min-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.05;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 100'000);
+
+ // Max-capped
+ cfg_limit.autoPhysMemScaleFactor = 0.90;
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_explicitly) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = 1'234'567;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 1'234'567);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 1'234'567);
+}
+
+TEST_F(MergeThrottlerTest, memory_limit_can_be_set_to_unlimited) {
+ StorServerConfigBuilder cfg(*default_server_config());
+ auto& cfg_limit = cfg.mergeThrottlingMemoryLimit;
+ auto& mt = throttler(0);
+
+ cfg_limit.maxUsageBytes = -1;
+ mt.set_hw_info_locking(make_mem_info(1'000'000));
+ mt.on_configure(cfg);
+ // Zero implies infinity
+ EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 0);
+ EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 0);
+}
// TODO test message queue aborting (use rendezvous functionality--make guard)
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index edb13eea5af..63d8eec6dc3 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -40,7 +40,8 @@ struct Fixture {
vdstestlib::DirConfig config{getStandardConfig(true)};
TestServiceLayerApp app;
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()};
+ MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())),
+ app.getComponentRegister(), vespalib::HwInfo()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 2f34d5372ab..0d877d33277 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -54,9 +54,29 @@ merge_throttling_policy.window_size_increment double default=2.0
##
## Semantics:
## > 0 explicit limit in bytes
-## == 0 limit automatically derived by content node
+## == 0 limit automatically deduced by content node
## < 0 unlimited (legacy behavior)
-max_merge_memory_usage_bytes long default=-1
+merge_throttling_memory_limit.max_usage_bytes long default=-1
+
+## If merge_throttling_memory_limit.max_usage_bytes == 0, this factor is used
+## as a multiplier to automatically deduce a memory limit for merges on the
+## content node. Note that the result of this multiplication is capped at both
+## ends by the auto_(lower|upper)_bound_bytes config values.
+##
+## Default: 3% of physical memory
+merge_throttling_memory_limit.auto_phys_mem_scale_factor double default=0.03
+
+## The absolute minimum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 128MiB
+merge_throttling_memory_limit.auto_lower_bound_bytes long default=134217728
+
+## The absolute maximum memory limit that can be set when automatically
+## deducing the limit from physical memory on the node.
+##
+## Default: 2GiB
+merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648
## If the persistence provider indicates that it has exhausted one or more
## of its internal resources during a mutating operation, new merges will
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 4646604a980..82bd5ff7ace 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -69,6 +69,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the "
"memory usage of the merges currently in the active window", this),
+ merge_memory_limit("merge_memory_limit", {}, "The active soft limit for memory used by merge operations on this node", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -184,9 +185,11 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
MergeThrottler::MergeThrottler(
const StorServerConfig& bootstrap_config,
- StorageComponentRegister& compReg)
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info)
: StorageLink("Merge Throttler"),
framework::HtmlStatusReporter("merges", "Merge Throttler"),
+ _hw_info(hw_info),
_merges(),
_queue(),
_maxQueueSize(1024),
@@ -195,13 +198,13 @@ MergeThrottler::MergeThrottler(
_messageLock(),
_stateLock(),
_metrics(std::make_unique<Metrics>()),
- _component(compReg, "mergethrottler"),
+ _component(comp_reg, "mergethrottler"),
_thread(),
_rendezvous(RendezvousState::NONE),
_throttle_until_time(),
_backpressure_duration(std::chrono::seconds(30)),
_active_merge_memory_used_bytes(0),
- _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited
+ _max_merge_memory_usage_bytes(0), // 0 ==> unlimited
_use_dynamic_throttling(false),
_disable_queue_limits_for_chained_merges(false),
_closing(false)
@@ -250,12 +253,14 @@ MergeThrottler::on_configure(const StorServerConfig& new_config)
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
_disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
- if (new_config.maxMergeMemoryUsageBytes > 0) {
- _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes);
+ if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) {
+ _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes);
+ } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) {
+ _max_merge_memory_usage_bytes = deduced_memory_limit(new_config);
} else {
- _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits
+ _max_merge_memory_usage_bytes = 0; // Implies unlimited
}
-
+ _metrics->merge_memory_limit.set(static_cast<int64_t>(_max_merge_memory_usage_bytes));
}
MergeThrottler::~MergeThrottler()
@@ -1325,17 +1330,41 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion)
}
void
-MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept {
+MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept {
std::lock_guard lock(_stateLock);
_disable_queue_limits_for_chained_merges = disable_limits;
}
void
-MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept {
+MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept {
std::lock_guard lock(_stateLock);
_max_merge_memory_usage_bytes = max_memory_bytes;
}
+uint32_t
+MergeThrottler::max_merge_memory_usage_bytes_locking() const noexcept {
+ std::lock_guard lock(_stateLock);
+ return _max_merge_memory_usage_bytes;
+}
+
+void
+MergeThrottler::set_hw_info_locking(const vespalib::HwInfo& hw_info) {
+ std::lock_guard lock(_stateLock);
+ _hw_info = hw_info;
+}
+
+size_t
+MergeThrottler::deduced_memory_limit(const StorServerConfig& cfg) const noexcept {
+ const auto min_limit = static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoLowerBoundBytes, 1L));
+ const auto max_limit = std::max(static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoUpperBoundBytes, 1L)), min_limit);
+ const auto mem_scale_factor = std::max(cfg.mergeThrottlingMemoryLimit.autoPhysMemScaleFactor, 0.0);
+
+ const auto node_mem = static_cast<double>(_hw_info.memory().sizeBytes());
+ const auto scaled_mem = static_cast<size_t>(node_mem * mem_scale_factor);
+
+ return std::min(std::max(scaled_mem, min_limit), max_limit);
+}
+
void
MergeThrottler::update_active_merge_window_size_metric() noexcept {
_metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 8f6c4d62f68..a5559c159bf 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -1,26 +1,24 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::MergeThrottler
- * @ingroup storageserver
- *
- * @brief Throttler and forwarder of merge commands
+ * Throttler and forwarder of merge commands
*/
#pragma once
-#include <vespa/storage/config/config-stor-server.h>
-#include <vespa/storage/common/message_guard.h>
-#include <vespa/storage/common/storagelink.h>
-#include <vespa/storage/common/storagecomponent.h>
-#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
-#include <vespa/storageframework/generic/thread/runnable.h>
-#include <vespa/storageapi/message/bucket.h>
+#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/document/bucket/bucket.h>
+#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/metricset.h>
+#include <vespa/metrics/metrictimer.h>
#include <vespa/metrics/summetric.h>
-#include <vespa/metrics/countmetric.h>
#include <vespa/metrics/valuemetric.h>
-#include <vespa/metrics/metrictimer.h>
-#include <vespa/config/helper/ifetchercallback.h>
+#include <vespa/storage/common/message_guard.h>
+#include <vespa/storage/common/storagecomponent.h>
+#include <vespa/storage/common/storagelink.h>
+#include <vespa/storage/config/config-stor-server.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
+#include <vespa/storageframework/generic/thread/runnable.h>
+#include <vespa/vespalib/util/hw_info.h>
#include <chrono>
@@ -72,6 +70,7 @@ public:
metrics::LongValueMetric queueSize;
metrics::LongValueMetric active_window_size;
metrics::LongValueMetric estimated_merge_memory_usage;
+ metrics::LongValueMetric merge_memory_limit;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -165,6 +164,7 @@ private:
RELEASED
};
+ vespalib::HwInfo _hw_info;
ActiveMergeMap _merges;
MergePriorityQueue _queue;
size_t _maxQueueSize;
@@ -192,7 +192,9 @@ public:
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&);
+ MergeThrottler(const StorServerConfig& bootstrap_config,
+ StorageComponentRegister& comp_reg,
+ const vespalib::HwInfo& hw_info);
~MergeThrottler() override;
/** Implements document::Runnable::run */
@@ -225,8 +227,10 @@ public:
// For unit testing only
const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
- void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept;
- void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept;
+ void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept;
+ void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept;
+ [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept;
+ void set_hw_info_locking(const vespalib::HwInfo& hw_info);
// For unit testing only
std::mutex& getStateLock() { return _stateLock; }
@@ -408,6 +412,8 @@ private:
void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
+ [[nodiscard]] size_t deduced_memory_limit(const StorServerConfig& cfg) const noexcept;
+
void update_active_merge_window_size_metric() noexcept;
void update_active_merge_memory_usage_metric() noexcept;
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 0cce2c27e95..7da75225b6c 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -30,8 +30,9 @@ ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(Ser
ServiceLayerNode::ServiceLayerBootstrapConfigs&
ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default;
-ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
@@ -41,6 +42,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
_context(context),
_persistenceProvider(persistenceProvider),
_externalVisitors(externalVisitors),
+ _hw_info(hw_info),
_persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)),
_visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)),
_filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)),
@@ -172,7 +174,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config());
_bouncer = bouncer.get();
builder.add(std::move(bouncer));
- auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg);
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg, _hw_info);
_merge_throttler = merge_throttler_up.get();
builder.add(std::move(merge_throttler_up));
auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index ae39bb0805e..bea09a1c9ce 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -12,6 +12,7 @@
#include <vespa/storage/common/visitorfactory.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
#include <vespa/storage/visiting/visitormessagesessionfactory.h>
+#include <vespa/vespalib/util/hw_info.h>
namespace storage {
@@ -39,6 +40,7 @@ private:
ServiceLayerNodeContext& _context;
spi::PersistenceProvider& _persistenceProvider;
VisitorFactory::Map _externalVisitors;
+ vespalib::HwInfo _hw_info;
std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config;
std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config;
std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config;
@@ -66,8 +68,9 @@ public:
ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept;
};
- ServiceLayerNode(const config::ConfigUri & configUri,
+ ServiceLayerNode(const config::ConfigUri& configUri,
ServiceLayerNodeContext& context,
+ const vespalib::HwInfo& hw_info,
ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
diff --git a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
index 8940c2a320e..245afb1c774 100644
--- a/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/dummyservicelayerprocess.cpp
@@ -7,7 +7,7 @@ namespace storage {
// DummyServiceLayerProcess implementation
DummyServiceLayerProcess::DummyServiceLayerProcess(const config::ConfigUri & configUri)
- : ServiceLayerProcess(configUri)
+ : ServiceLayerProcess(configUri, vespalib::HwInfo())
{
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index bb284bfc108..ebf320352eb 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -31,7 +31,7 @@ bucket_db_options_from_config(const config::ConfigUri& config_uri) {
}
-ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri)
+ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri, const vespalib::HwInfo& hw_info)
: Process(configUri),
_externalVisitors(),
_persistence_cfg_handle(),
@@ -39,6 +39,7 @@ ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri)
_filestor_cfg_handle(),
_node(),
_storage_chain_builder(),
+ _hw_info(hw_info),
_context(std::make_unique<framework::defaultimplementation::RealClock>(),
bucket_db_options_from_config(configUri))
{
@@ -106,7 +107,8 @@ ServiceLayerProcess::createNode()
sbc.visitor_cfg = _visitor_cfg_handle->getConfig();
sbc.filestor_cfg = _filestor_cfg_handle->getConfig();
- _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors);
+ _node = std::make_unique<ServiceLayerNode>(_configUri, _context, _hw_info, std::move(sbc),
+ *this, getProvider(), _externalVisitors);
if (_storage_chain_builder) {
_node->set_storage_chain_builder(std::move(_storage_chain_builder));
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index dcc56f373c4..add5a38ca9d 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -7,6 +7,7 @@
#include <vespa/storage/common/visitorfactory.h>
#include <vespa/storage/storageserver/servicelayernodecontext.h>
#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/vespalib/util/hw_info.h>
namespace config { class ConfigUri; }
@@ -29,14 +30,15 @@ private:
std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle;
std::unique_ptr<config::ConfigHandle<StorFilestorConfig>> _filestor_cfg_handle;
- std::unique_ptr<ServiceLayerNode> _node;
+ std::unique_ptr<ServiceLayerNode> _node;
std::unique_ptr<IStorageChainBuilder> _storage_chain_builder;
protected:
+ vespalib::HwInfo _hw_info;
ServiceLayerNodeContext _context;
public:
- explicit ServiceLayerProcess(const config::ConfigUri & configUri);
+ ServiceLayerProcess(const config::ConfigUri & configUri, const vespalib::HwInfo& hw_info);
~ServiceLayerProcess() override;
void shutdown() override;