diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-10-23 14:00:59 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-10-24 09:05:31 +0000 |
commit | 8fc6725988f9cc475c502412fe59391e7f150674 (patch) | |
tree | 0c1164b5685b0d262f483b891435e2a7aff086af /storage | |
parent | 094c30a06a3e4441ee4dbeaa61de76fe1b6f54e4 (diff) |
Wire config to MergeThrottler in from the outside
Diffstat (limited to 'storage')
6 files changed, 66 insertions, 52 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 8e87e07eeff..990a552780b 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -2,6 +2,7 @@ #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <tests/common/teststorageapp.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> #include <vespa/storage/storageserver/mergethrottler.h> @@ -28,7 +29,9 @@ namespace storage { namespace { -vespalib::string _Storage("storage"); +using StorServerConfig = vespa::config::content::core::StorServerConfig; + +vespalib::string _storage("storage"); struct MergeBuilder { document::BucketId _bucket; @@ -108,7 +111,7 @@ struct MergeBuilder { auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(_bucket), n, _maxTimestamp, _clusterStateVersion, _chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, _nodes[0])); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0])); return cmd; } }; @@ -121,6 +124,10 @@ makeSystemStateCmd(const std::string& state) return std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state)); } +std::unique_ptr<StorServerConfig> config_from(const ::config::ConfigUri& cfg_uri) { + return ::config::ConfigGetter<StorServerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); +} + } // anon ns struct MergeThrottlerTest : Test { @@ -167,7 +174,9 @@ MergeThrottlerTest::~MergeThrottlerTest() = default; void MergeThrottlerTest::SetUp() { - vdstestlib::DirConfig config(getStandardConfig(true)); + vdstestlib::DirConfig dir_config(getStandardConfig(true)); + auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); + auto config = config_from(cfg_uri); for (int i = 0; i < _storageNodeCount; ++i) { auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); @@ -175,7 +184,7 @@ MergeThrottlerTest::SetUp() std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - MergeThrottler* throttler = new MergeThrottler(::config::ConfigUri(config.getConfigId()), server->getComponentRegister()); + MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); DummyStorageLink* bottom = new DummyStorageLink; @@ -283,7 +292,7 @@ TEST_F(MergeThrottlerTest, chain) { auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123); cmd->setPriority(7); cmd->setTimeout(54321ms); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); const uint16_t distributorIndex = 123; cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded @@ -423,7 +432,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) { std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -468,7 +477,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[1]->sendDown(cmd); _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -503,7 +512,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[1]->sendDown(cmd); _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -550,7 +559,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); // Send to last node, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 3)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 3)); _topLinks[2]->sendDown(cmd); _bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -595,7 +604,7 @@ TEST_F(MergeThrottlerTest, resend_handling) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -962,7 +971,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 9)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 9)); _topLinks[0]->sendDown(cmd); // First, test that we get rejected when processing merge immediately @@ -1145,7 +1154,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp index 84a98385962..a30594f6076 100644 --- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -6,6 +6,7 @@ #include <vespa/vdstestlib/config/dirconfig.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/vespalib/gtest/gtest.h> using namespace ::testing; @@ -37,10 +38,16 @@ struct Fixture { vdstestlib::DirConfig config{getStandardConfig(true)}; TestServiceLayerApp app; ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; - MergeThrottler merge_throttler{config::ConfigUri(config.getConfigId()), app.getComponentRegister()}; + MergeThrottler merge_throttler{*config_from(config::ConfigUri(config.getConfigId())), app.getComponentRegister()}; TestShutdownListener shutdown_listener; ServiceLayerErrorListener error_listener{component, merge_throttler}; + using StorServerConfig = vespa::config::content::core::StorServerConfig; + + static std::unique_ptr<StorServerConfig> config_from(const ::config::ConfigUri& cfg_uri) { + return ::config::ConfigGetter<StorServerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); + } + Fixture(); ~Fixture(); }; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 6e2f2a77d20..4cc2a7a89ab 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -179,7 +179,7 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept } MergeThrottler::MergeThrottler( - const config::ConfigUri & configUri, + const StorServerConfig& bootstrap_config, StorageComponentRegister& compReg) : StorageLink("Merge Throttler"), framework::HtmlStatusReporter("merges", "Merge Throttler"), @@ -190,7 +190,6 @@ MergeThrottler::MergeThrottler( _queueSequence(0), _messageLock(), _stateLock(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _metrics(std::make_unique<Metrics>()), _component(compReg, "mergethrottler"), _thread(), @@ -203,34 +202,33 @@ MergeThrottler::MergeThrottler( { _throttlePolicy->setMinWindowSize(20); _throttlePolicy->setMaxWindowSize(20); - _configFetcher->subscribe<StorServerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerStatusPage(*this); _component.registerMetric(*_metrics); } void -MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) +MergeThrottler::on_configure(const StorServerConfig& new_config) { std::lock_guard lock(_stateLock); - _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type + _use_dynamic_throttling = (new_config.mergeThrottlingPolicy.type == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC); - if (newConfig->maxMergesPerNode < 1) { + if (new_config.maxMergesPerNode < 1) { throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } - if (newConfig->maxMergeQueueSize < 0) { + if (new_config.maxMergeQueueSize < 0) { throw config::InvalidConfigException("Max merge queue size cannot be less than 0"); } - if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { + if (new_config.resourceExhaustionMergeBackPressureDurationSecs < 0.0) { throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } if (_use_dynamic_throttling) { - auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1); - auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1); + auto min_win_sz = std::max(new_config.mergeThrottlingPolicy.minWindowSize, 1); + auto max_win_sz = std::max(new_config.mergeThrottlingPolicy.maxWindowSize, 1); if (min_win_sz > max_win_sz) { min_win_sz = max_win_sz; } - auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement); + auto win_sz_increment = std::max(1.0, new_config.mergeThrottlingPolicy.windowSizeIncrement); _throttlePolicy->setMinWindowSize(min_win_sz); _throttlePolicy->setMaxWindowSize(max_win_sz); _throttlePolicy->setWindowSizeIncrement(win_sz_increment); @@ -238,15 +236,14 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ min_win_sz, max_win_sz, win_sz_increment); } else { // Use legacy config values when static throttling is enabled. - _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode); - _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode); + _throttlePolicy->setMinWindowSize(new_config.maxMergesPerNode); + _throttlePolicy->setMaxWindowSize(new_config.maxMergesPerNode); } - LOG(debug, "Setting new max queue size to %d", - newConfig->maxMergeQueueSize); - _maxQueueSize = newConfig->maxMergeQueueSize; + LOG(debug, "Setting new max queue size to %d", new_config.maxMergeQueueSize); + _maxQueueSize = new_config.maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( - std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); - _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges; + std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); + _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; } MergeThrottler::~MergeThrottler() @@ -275,8 +272,6 @@ MergeThrottler::onOpen() void MergeThrottler::onClose() { - // Avoid getting config on shutdown - _configFetcher->close(); { std::lock_guard guard(_messageLock); // Note: used to prevent taking locks in different order if onFlush diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 840f1df1177..5362c2f6df8 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -35,10 +35,11 @@ class AbortBucketOperationsCommand; class MergeThrottler : public framework::Runnable, public StorageLink, - public framework::HtmlStatusReporter, - private config::IFetcherCallback<vespa::config::content::core::StorServerConfig> + public framework::HtmlStatusReporter { public: + using StorServerConfig = vespa::config::content::core::StorServerConfig; + class MergeFailureMetrics : public metrics::MetricSet { public: metrics::SumMetric<metrics::LongCountMetric> sum; @@ -172,7 +173,6 @@ private: mutable std::mutex _messageLock; std::condition_variable _messageCond; mutable std::mutex _stateLock; - std::unique_ptr<config::ConfigFetcher> _configFetcher; // Messages pending to be processed by the worker thread std::vector<api::StorageMessage::SP> _messagesDown; std::vector<api::StorageMessage::SP> _messagesUp; @@ -190,7 +190,7 @@ public: * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); + MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&); ~MergeThrottler() override; /** Implements document::Runnable::run */ @@ -204,6 +204,8 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override; + void on_configure(const StorServerConfig& new_config); + /* * When invoked, merges to the node will be BUSY-bounced by the throttler * for a configurable period of time instead of being processed. @@ -282,11 +284,6 @@ private: [[nodiscard]] bool isChainCompleted() const noexcept; }; - /** - * Callback method for config system (IFetcherCallback) - */ - void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override; - // NOTE: unless explicitly specified, all the below functions require // _sync lock to be held upon call (usually implicitly via MessageGuard) diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 3faa8a3ddec..b09621335a7 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -37,6 +37,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, _bouncer(nullptr), _bucket_manager(nullptr), _fileStorManager(nullptr), + _merge_throttler(nullptr), _init_has_been_called(false) { } @@ -158,8 +159,8 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config()); _bouncer = bouncer.get(); builder.add(std::move(bouncer)); - auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg); - auto merge_throttler = merge_throttler_up.get(); + auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg); + _merge_throttler = merge_throttler_up.get(); builder.add(std::move(merge_throttler_up)); builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg)); auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister()); @@ -178,10 +179,17 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) // Lifetimes of all referenced components shall outlive the last call going // through the SPI, as queues are flushed and worker threads joined when // the storage link chain is closed prior to destruction. - auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); + auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *_merge_throttler); _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); } +void +ServiceLayerNode::on_configure(const StorServerConfig& config) +{ + assert(_merge_throttler); + _merge_throttler->on_configure(config); +} + ResumeGuard ServiceLayerNode::pause() { diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index dabc1c979a5..3e95b9c6d3b 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -1,10 +1,4 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::ServiceLayerNode - * \ingroup storageserver - * - * \brief Class for setting up a service layer node. - */ #pragma once @@ -23,6 +17,7 @@ namespace spi { struct PersistenceProvider; } class Bouncer; class BucketManager; class FileStorManager; +class MergeThrottler; class ServiceLayerNode : public StorageNode, @@ -37,6 +32,7 @@ class ServiceLayerNode Bouncer* _bouncer; BucketManager* _bucket_manager; FileStorManager* _fileStorManager; + MergeThrottler* _merge_throttler; bool _init_has_been_called; public: @@ -54,6 +50,8 @@ public: */ void init(); + void on_configure(const StorServerConfig& config); + const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; } ResumeGuard pause() override; |