diff options
Diffstat (limited to 'storage')
7 files changed, 196 insertions, 58 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 48e0ab186f2..6f80ffe0727 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -31,9 +31,16 @@ namespace storage { namespace { using StorServerConfig = vespa::config::content::core::StorServerConfig; +using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder; vespalib::string _storage("storage"); +std::unique_ptr<StorServerConfig> default_server_config() { + vdstestlib::DirConfig dir_config(getStandardConfig(true)); + auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); + return config_from<StorServerConfig>(cfg_uri); +} + struct MergeBuilder { document::BucketId _bucket; api::Timestamp _maxTimestamp; @@ -157,6 +164,11 @@ struct MergeThrottlerTest : Test { void SetUp() override; void TearDown() override; + MergeThrottler& throttler(size_t idx) noexcept { + assert(idx < _throttlers.size()); + return *_throttlers[idx]; + } + api::MergeBucketCommand::SP sendMerge(const MergeBuilder&); void send_and_expect_reply( @@ -186,17 +198,14 @@ MergeThrottlerTest::~MergeThrottlerTest() = default; void MergeThrottlerTest::SetUp() { - vdstestlib::DirConfig dir_config(getStandardConfig(true)); - auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); - auto config = config_from<StorServerConfig>(cfg_uri); - + auto config = default_server_config(); for (int i = 0; i < _storageNodeCount; ++i) { auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); server->setClusterState(lib::ClusterState("distributor:100 storage:100 version:1")); std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - auto* throttler = new MergeThrottler(*config, server->getComponentRegister()); + auto* throttler = new MergeThrottler(*config, server->getComponentRegister(), vespalib::HwInfo()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); auto* bottom = new DummyStorageLink; @@ -1228,7 +1237,7 @@ void MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain - _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); + _throttlers[1]->set_disable_queue_limits_for_chained_merges_locking(disable_queue_limits); size_t max_pending = throttler_max_merges_pending(1); size_t max_enqueued = _throttlers[1]->getMaxQueueSize(); for (size_t i = 0; i < max_pending + max_enqueued; ++i) { @@ -1514,15 +1523,15 @@ TEST_F(MergeThrottlerTest, backpressure_evicts_all_queued_merges) { TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_available_active_window_slots) { ASSERT_GT(throttler_max_merges_pending(0), 1); // Sanity check for the test itself - _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi); - ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0); std::shared_ptr<api::StorageMessage> fwd_cmd; ASSERT_NO_FATAL_FAILURE(fwd_cmd = send_and_expect_forwarding( MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Accepting this merge would exceed memory limits. It is sent as part of a forwarded unordered // merge and can therefore NOT be enqueued; it must be bounced immediately. @@ -1532,7 +1541,7 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_ .memory_usage(8_Mi).create(), MessageType::MERGEBUCKET_REPLY, ReturnCode::BUSY)); - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged // Fail the forwarded merge. This shall immediately free up the memory usage, allowing a new merge in. auto fwd_reply = dynamic_cast<api::MergeBucketCommand&>(*fwd_cmd).makeReply(); @@ -1542,38 +1551,38 @@ TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_rejects_merges_even_with_ std::shared_ptr<api::StorageReply>(std::move(fwd_reply)), MessageType::MERGEBUCKET_REPLY, ReturnCode::ABORTED)); // Unwind reply for failed merge - ASSERT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0); + ASSERT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0); - // New merge is accepted - _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create()); - _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded to node 1 + // New merge is accepted and forwarded + ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( + MergeBuilder(document::BucketId(16, 2)).nodes(0, 1, 2).unordered(true).memory_usage(9_Mi).create())); - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi); + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 9_Mi); } TEST_F(MergeThrottlerTest, exceeding_memory_soft_limit_can_enqueue_unordered_merge_sent_directly_from_distributor) { - _throttlers[0]->set_max_merge_memory_usage_bytes(10_Mi); + throttler(0).set_max_merge_memory_usage_bytes_locking(10_Mi); ASSERT_NO_FATAL_FAILURE(send_and_expect_forwarding( MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).memory_usage(5_Mi).create())); - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Accepting this merge would exceed memory limits. It is sent directly from a distributor and // can therefore be enqueued. _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1)).nodes(0, 1, 2).unordered(true).memory_usage(8_Mi).create()); - waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 5_Mi); // Unchanged } TEST_F(MergeThrottlerTest, at_least_one_merge_is_accepted_even_if_exceeding_memory_soft_limit) { - _throttlers[0]->set_max_merge_memory_usage_bytes(5_Mi); + throttler(0).set_max_merge_memory_usage_bytes_locking(5_Mi); _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 0)).nodes(0, 1, 2).unordered(true).memory_usage(100_Mi).create()); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); // Forwarded, _not_ bounced - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi); + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 100_Mi); } TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { @@ -1582,19 +1591,87 @@ TEST_F(MergeThrottlerTest, queued_merges_are_not_counted_towards_memory_usage) { // send for below in the test code. ASSERT_LT(throttler_max_merges_pending(0), 1000); - _throttlers[0]->set_max_merge_memory_usage_bytes(50_Mi); + throttler(0).set_max_merge_memory_usage_bytes_locking(50_Mi); // Fill up active window on node 0. Note: these merges do not have any associated memory cost. fill_throttler_queue_with_n_commands(0, 0); - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); _topLinks[0]->sendDown(MergeBuilder(document::BucketId(16, 1000)).nodes(0, 1, 2).unordered(true).memory_usage(10_Mi).create()); - waitUntilMergeQueueIs(*_throttlers[0], 1, _messageWaitTime); // Should end up in queue + waitUntilMergeQueueIs(throttler(0), 1, _messageWaitTime); // Should end up in queue + + EXPECT_EQ(throttler(0).getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); +} + +namespace { - EXPECT_EQ(_throttlers[0]->getMetrics().estimated_merge_memory_usage.getLast(), 0_Mi); +vespalib::HwInfo make_mem_info(uint64_t mem_size) { + return {{0, false, false}, {mem_size}, {1}}; } -// TODO define and test auto-deduced max memory usage +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_auto_deduced_from_hw_info) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + // Enable auto-deduction of limits + cfg_limit.maxUsageBytes = 0; + + cfg_limit.autoLowerBoundBytes = 100'000; + cfg_limit.autoUpperBoundBytes = 750'000; + cfg_limit.autoPhysMemScaleFactor = 0.5; + + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 500'000); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 500'000); + + cfg_limit.autoPhysMemScaleFactor = 0.75; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 750'000); + + cfg_limit.autoPhysMemScaleFactor = 0.25; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 250'000); + + // Min-capped + cfg_limit.autoPhysMemScaleFactor = 0.05; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 100'000); + + // Max-capped + cfg_limit.autoPhysMemScaleFactor = 0.90; + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 750'000); +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_set_explicitly) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + cfg_limit.maxUsageBytes = 1'234'567; + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 1'234'567); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 1'234'567); +} + +TEST_F(MergeThrottlerTest, memory_limit_can_be_set_to_unlimited) { + StorServerConfigBuilder cfg(*default_server_config()); + auto& cfg_limit = cfg.mergeThrottlingMemoryLimit; + auto& mt = throttler(0); + + cfg_limit.maxUsageBytes = -1; + mt.set_hw_info_locking(make_mem_info(1'000'000)); + mt.on_configure(cfg); + // Zero implies infinity + EXPECT_EQ(mt.max_merge_memory_usage_bytes_locking(), 0); + EXPECT_EQ(throttler(0).getMetrics().merge_memory_limit.getLast(), 0); +} // TODO test message queue aborting (use rendezvous functionality--make guard) diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp index edb13eea5af..63d8eec6dc3 100644 --- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -40,7 +40,8 @@ struct Fixture { vdstestlib::DirConfig config{getStandardConfig(true)}; TestServiceLayerApp app; ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; - MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()}; + MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), + app.getComponentRegister(), vespalib::HwInfo()}; TestShutdownListener shutdown_listener; ServiceLayerErrorListener error_listener{component, merge_throttler}; diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 2f34d5372ab..0d877d33277 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -54,9 +54,29 @@ merge_throttling_policy.window_size_increment double default=2.0 ## ## Semantics: ## > 0 explicit limit in bytes -## == 0 limit automatically derived by content node +## == 0 limit automatically deduced by content node ## < 0 unlimited (legacy behavior) -max_merge_memory_usage_bytes long default=-1 +merge_throttling_memory_limit.max_usage_bytes long default=-1 + +## If merge_throttling_memory_limit.max_usage_bytes == 0, this factor is used +## as a multiplier to automatically deduce a memory limit for merges on the +## content node. Note that the result of this multiplication is capped at both +## ends by the auto_(lower|upper)_bound_bytes config values. +## +## Default: 3% of physical memory +merge_throttling_memory_limit.auto_phys_mem_scale_factor double default=0.03 + +## The absolute minimum memory limit that can be set when automatically +## deducing the limit from physical memory on the node. +## +## Default: 128MiB +merge_throttling_memory_limit.auto_lower_bound_bytes long default=134217728 + +## The absolute maximum memory limit that can be set when automatically +## deducing the limit from physical memory on the node. +## +## Default: 2GiB +merge_throttling_memory_limit.auto_upper_bound_bytes long default=2147483648 ## If the persistence provider indicates that it has exhausted one or more ## of its internal resources during a mutating operation, new merges will diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 4646604a980..82bd5ff7ace 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -69,6 +69,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner) active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this), estimated_merge_memory_usage("estimated_merge_memory_usage", {}, "An estimated upper bound of the " "memory usage of the merges currently in the active window", this), + merge_memory_limit("merge_memory_limit", {}, "The active soft limit for memory used by merge operations on this node", this), bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this), chaining("mergechains", this), local("locallyexecutedmerges", this) @@ -184,9 +185,11 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept MergeThrottler::MergeThrottler( const StorServerConfig& bootstrap_config, - StorageComponentRegister& compReg) + StorageComponentRegister& comp_reg, + const vespalib::HwInfo& hw_info) : StorageLink("Merge Throttler"), framework::HtmlStatusReporter("merges", "Merge Throttler"), + _hw_info(hw_info), _merges(), _queue(), _maxQueueSize(1024), @@ -195,13 +198,13 @@ MergeThrottler::MergeThrottler( _messageLock(), _stateLock(), _metrics(std::make_unique<Metrics>()), - _component(compReg, "mergethrottler"), + _component(comp_reg, "mergethrottler"), _thread(), _rendezvous(RendezvousState::NONE), _throttle_until_time(), _backpressure_duration(std::chrono::seconds(30)), _active_merge_memory_used_bytes(0), - _max_merge_memory_usage_bytes(-1), // -1 ==> unlimited + _max_merge_memory_usage_bytes(0), // 0 ==> unlimited _use_dynamic_throttling(false), _disable_queue_limits_for_chained_merges(false), _closing(false) @@ -250,12 +253,14 @@ MergeThrottler::on_configure(const StorServerConfig& new_config) _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; - if (new_config.maxMergeMemoryUsageBytes > 0) { - _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.maxMergeMemoryUsageBytes); + if (new_config.mergeThrottlingMemoryLimit.maxUsageBytes > 0) { + _max_merge_memory_usage_bytes = static_cast<size_t>(new_config.mergeThrottlingMemoryLimit.maxUsageBytes); + } else if ((new_config.mergeThrottlingMemoryLimit.maxUsageBytes == 0) && (_hw_info.memory().sizeBytes() > 0)) { + _max_merge_memory_usage_bytes = deduced_memory_limit(new_config); } else { - _max_merge_memory_usage_bytes = 0; // TODO auto-deduce based on local limits + _max_merge_memory_usage_bytes = 0; // Implies unlimited } - + _metrics->merge_memory_limit.set(static_cast<int64_t>(_max_merge_memory_usage_bytes)); } MergeThrottler::~MergeThrottler() @@ -1325,17 +1330,41 @@ MergeThrottler::markActiveMergesAsAborted(uint32_t minimumStateVersion) } void -MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept { +MergeThrottler::set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept { std::lock_guard lock(_stateLock); _disable_queue_limits_for_chained_merges = disable_limits; } void -MergeThrottler::set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept { +MergeThrottler::set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept { std::lock_guard lock(_stateLock); _max_merge_memory_usage_bytes = max_memory_bytes; } +uint32_t +MergeThrottler::max_merge_memory_usage_bytes_locking() const noexcept { + std::lock_guard lock(_stateLock); + return _max_merge_memory_usage_bytes; +} + +void +MergeThrottler::set_hw_info_locking(const vespalib::HwInfo& hw_info) { + std::lock_guard lock(_stateLock); + _hw_info = hw_info; +} + +size_t +MergeThrottler::deduced_memory_limit(const StorServerConfig& cfg) const noexcept { + const auto min_limit = static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoLowerBoundBytes, 1L)); + const auto max_limit = std::max(static_cast<size_t>(std::max(cfg.mergeThrottlingMemoryLimit.autoUpperBoundBytes, 1L)), min_limit); + const auto mem_scale_factor = std::max(cfg.mergeThrottlingMemoryLimit.autoPhysMemScaleFactor, 0.0); + + const auto node_mem = static_cast<double>(_hw_info.memory().sizeBytes()); + const auto scaled_mem = static_cast<size_t>(node_mem * mem_scale_factor); + + return std::min(std::max(scaled_mem, min_limit), max_limit); +} + void MergeThrottler::update_active_merge_window_size_metric() noexcept { _metrics->active_window_size.set(static_cast<int64_t>(_merges.size())); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 8f6c4d62f68..a5559c159bf 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -1,26 +1,24 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::MergeThrottler - * @ingroup storageserver - * - * @brief Throttler and forwarder of merge commands + * Throttler and forwarder of merge commands */ #pragma once -#include <vespa/storage/config/config-stor-server.h> -#include <vespa/storage/common/message_guard.h> -#include <vespa/storage/common/storagelink.h> -#include <vespa/storage/common/storagecomponent.h> -#include <vespa/storageframework/generic/status/htmlstatusreporter.h> -#include <vespa/storageframework/generic/thread/runnable.h> -#include <vespa/storageapi/message/bucket.h> +#include <vespa/config/helper/ifetchercallback.h> #include <vespa/document/bucket/bucket.h> +#include <vespa/metrics/countmetric.h> #include <vespa/metrics/metricset.h> +#include <vespa/metrics/metrictimer.h> #include <vespa/metrics/summetric.h> -#include <vespa/metrics/countmetric.h> #include <vespa/metrics/valuemetric.h> -#include <vespa/metrics/metrictimer.h> -#include <vespa/config/helper/ifetchercallback.h> +#include <vespa/storage/common/message_guard.h> +#include <vespa/storage/common/storagecomponent.h> +#include <vespa/storage/common/storagelink.h> +#include <vespa/storage/config/config-stor-server.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> +#include <vespa/storageframework/generic/thread/runnable.h> +#include <vespa/vespalib/util/hw_info.h> #include <chrono> @@ -72,6 +70,7 @@ public: metrics::LongValueMetric queueSize; metrics::LongValueMetric active_window_size; metrics::LongValueMetric estimated_merge_memory_usage; + metrics::LongValueMetric merge_memory_limit; metrics::LongCountMetric bounced_due_to_back_pressure; MergeOperationMetrics chaining; MergeOperationMetrics local; @@ -165,6 +164,7 @@ private: RELEASED }; + vespalib::HwInfo _hw_info; ActiveMergeMap _merges; MergePriorityQueue _queue; size_t _maxQueueSize; @@ -192,7 +192,9 @@ public: * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&); + MergeThrottler(const StorServerConfig& bootstrap_config, + StorageComponentRegister& comp_reg, + const vespalib::HwInfo& hw_info); ~MergeThrottler() override; /** Implements document::Runnable::run */ @@ -225,8 +227,10 @@ public: // For unit testing only const mbus::DynamicThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::DynamicThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } - void set_disable_queue_limits_for_chained_merges(bool disable_limits) noexcept; - void set_max_merge_memory_usage_bytes(uint32_t max_memory_bytes) noexcept; + void set_disable_queue_limits_for_chained_merges_locking(bool disable_limits) noexcept; + void set_max_merge_memory_usage_bytes_locking(uint32_t max_memory_bytes) noexcept; + [[nodiscard]] uint32_t max_merge_memory_usage_bytes_locking() const noexcept; + void set_hw_info_locking(const vespalib::HwInfo& hw_info); // For unit testing only std::mutex& getStateLock() { return _stateLock; } @@ -408,6 +412,8 @@ private: void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion); void markActiveMergesAsAborted(uint32_t minimumStateVersion); + [[nodiscard]] size_t deduced_memory_limit(const StorServerConfig& cfg) const noexcept; + void update_active_merge_window_size_metric() noexcept; void update_active_merge_memory_usage_metric() noexcept; diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 0cce2c27e95..7da75225b6c 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -30,8 +30,9 @@ ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(Ser ServiceLayerNode::ServiceLayerBootstrapConfigs& ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default; -ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, +ServiceLayerNode::ServiceLayerNode(const config::ConfigUri& configUri, ServiceLayerNodeContext& context, + const vespalib::HwInfo& hw_info, ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, @@ -41,6 +42,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, _context(context), _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), + _hw_info(hw_info), _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)), _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)), _filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)), @@ -172,7 +174,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config()); _bouncer = bouncer.get(); builder.add(std::move(bouncer)); - auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg); + auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg, _hw_info); _merge_throttler = merge_throttler_up.get(); builder.add(std::move(merge_throttler_up)); auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index ae39bb0805e..bea09a1c9ce 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -12,6 +12,7 @@ #include <vespa/storage/common/visitorfactory.h> #include <vespa/storage/visiting/config-stor-visitor.h> #include <vespa/storage/visiting/visitormessagesessionfactory.h> +#include <vespa/vespalib/util/hw_info.h> namespace storage { @@ -39,6 +40,7 @@ private: ServiceLayerNodeContext& _context; spi::PersistenceProvider& _persistenceProvider; VisitorFactory::Map _externalVisitors; + vespalib::HwInfo _hw_info; std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config; std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config; std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config; @@ -66,8 +68,9 @@ public: ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept; }; - ServiceLayerNode(const config::ConfigUri & configUri, + ServiceLayerNode(const config::ConfigUri& configUri, ServiceLayerNodeContext& context, + const vespalib::HwInfo& hw_info, ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, |