diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-10-24 12:27:49 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2023-10-24 12:27:49 +0000 |
commit | ca8fbc7725cc3907fa6e6c0af662422362db6cfb (patch) | |
tree | 7e94f4bae55aa6b7550e1999236b5df5a09ec294 | |
parent | 25aa6778a04c8ff4de0509d0a98133c9e92a605a (diff) |
Propagate `VisitorManager` config from outside
9 files changed, 69 insertions, 38 deletions
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 2b7039f36ea..d4047bc9a72 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/storageapi/message/datagram.h> @@ -88,7 +89,9 @@ VisitorManagerTest::initializeTest(bool defer_manager_thread_start) _node->setupDummyPersistence(); _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); _top = std::make_unique<DummyStorageLink>(); - auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), + using vespa::config::content::core::StorVisitorConfig; + auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); + auto vm = std::make_unique<VisitorManager>(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory, VisitorFactory::Map(), diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 49f1bc778fc..f83b6c99d64 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/config/common/exceptions.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/datatype/documenttype.h> @@ -168,9 +169,10 @@ VisitorTest::initializeTest(const TestParams& params) } _node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); _top = std::make_unique<DummyStorageLink>(); + using vespa::config::content::core::StorVisitorConfig; + auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); _top->push_back(std::unique_ptr<StorageLink>(_manager - = new VisitorManager(config::ConfigUri(config.getConfigId()), - _node->getComponentRegister(), *_messageSessionFactory))); + = new VisitorManager(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory))); _bottom = new DummyStorageLink(); _top->push_back(std::unique_ptr<StorageLink>(_bottom)); _manager->setTimeBetweenTicks(10); diff --git a/storage/src/vespa/storage/common/visitorfactory.h b/storage/src/vespa/storage/common/visitorfactory.h index 393744c4f59..1f18ace3095 100644 --- a/storage/src/vespa/storage/common/visitorfactory.h +++ b/storage/src/vespa/storage/common/visitorfactory.h @@ -12,6 +12,7 @@ namespace storage { +class StorageComponent; class Visitor; class VisitorEnvironment { diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 9736d6b05ed..a142d322c53 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -42,11 +42,13 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)), + _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)), _bouncer(nullptr), _bucket_manager(nullptr), _changed_bucket_ownership_handler(nullptr), _fileStorManager(nullptr), _merge_throttler(nullptr), + _visitor_manager(nullptr), _init_has_been_called(false) { } @@ -177,8 +179,10 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) auto bucket_manager = std::make_unique<BucketManager>(server_config(), _context.getComponentRegister()); _bucket_manager = bucket_manager.get(); builder.add(std::move(bucket_manager)); - builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(), - static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors)); + auto visitor_manager = std::make_unique<VisitorManager>(*_visitor_bootstrap_config, _context.getComponentRegister(), + static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors); + _visitor_manager = visitor_manager.get(); + builder.add(std::move(visitor_manager)); builder.add(std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, _configUri)); auto state_manager = releaseStateManager(); auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _persistenceProvider, _context.getComponentRegister(), @@ -211,6 +215,14 @@ ServiceLayerNode::on_configure(const PersistenceConfig& config) _changed_bucket_ownership_handler->on_configure(config); } +void +ServiceLayerNode::on_configure(const StorVisitorConfig& config) +{ + assert(_visitor_manager); + _visitor_manager->on_configure(config); +} + + ResumeGuard ServiceLayerNode::pause() { diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index f13c54a16f1..10699c5a40f 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -7,9 +7,10 @@ #include "storagenode.h" #include "vespa/vespalib/util/jsonstream.h" #include <vespa/config-persistence.h> -#include <vespa/storage/visiting/visitormessagesessionfactory.h> -#include <vespa/storage/common/visitorfactory.h> #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/visitorfactory.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/storage/visiting/visitormessagesessionfactory.h> namespace storage { @@ -20,6 +21,7 @@ class BucketManager; class ChangedBucketOwnershipHandler; class FileStorManager; class MergeThrottler; +class VisitorManager; class ServiceLayerNode : public StorageNode, @@ -29,16 +31,19 @@ class ServiceLayerNode { public: using PersistenceConfig = vespa::config::content::PersistenceConfig; + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; private: ServiceLayerNodeContext& _context; spi::PersistenceProvider& _persistenceProvider; VisitorFactory::Map _externalVisitors; std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config; + std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config; Bouncer* _bouncer; BucketManager* _bucket_manager; ChangedBucketOwnershipHandler* _changed_bucket_ownership_handler; FileStorManager* _fileStorManager; MergeThrottler* _merge_throttler; + VisitorManager* _visitor_manager; bool _init_has_been_called; public: @@ -47,6 +52,7 @@ public: struct ServiceLayerBootstrapConfigs { BootstrapConfigs storage_bootstrap_configs; std::unique_ptr<PersistenceConfig> persistence_cfg; + std::unique_ptr<StorVisitorConfig> visitor_cfg; ServiceLayerBootstrapConfigs(); ~ServiceLayerBootstrapConfigs(); @@ -68,6 +74,7 @@ public: void on_configure(const StorServerConfig& config); void on_configure(const PersistenceConfig& config); + void on_configure(const StorVisitorConfig& config); const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; } diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 1c26923b15f..dc1635bc4b1 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -21,7 +21,7 @@ LOG_SETUP(".visitor.manager"); namespace storage { -VisitorManager::VisitorManager(const config::ConfigUri & configUri, +VisitorManager::VisitorManager(const StorVisitorConfig& bootstrap_config, StorageComponentRegister& componentRegister, VisitorMessageSessionFactory& messageSF, VisitorFactory::Map externalFactories, @@ -35,7 +35,6 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _visitorLock(), _visitorCond(), _visitorCounter(0), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _metrics(std::make_shared<VisitorMetrics>()), _maxFixedConcurrentVisitors(1), _maxVariableConcurrentVisitors(0), @@ -51,8 +50,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _enforceQueueUse(false), _visitorFactories(std::move(externalFactories)) { - _configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerMetric(*_metrics); if (!defer_manager_thread_start) { create_and_start_manager_thread(); @@ -94,8 +92,6 @@ VisitorManager::updateMetrics(const MetricLockGuard &) void VisitorManager::onClose() { - // Avoid getting config during shutdown - _configFetcher->close(); { std::lock_guard sync(_visitorLock); for (auto& enqueued : _visitorQueue) { @@ -118,25 +114,25 @@ VisitorManager::print(std::ostream& out, bool verbose, const std::string& indent } void -VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig> config) +VisitorManager::on_configure(const vespa::config::content::core::StorVisitorConfig& config) { std::lock_guard sync(_visitorLock); - if (config->defaultdocblocksize % 512 != 0) { + if (config.defaultdocblocksize % 512 != 0) { throw config::InvalidConfigException( - "The default docblock size needs to be a multiplum of the " + "The default docblock size needs to be a multiple of the " "disk block size. (512b)"); } // Do some sanity checking of input. Cannot haphazardly mix and match // old and new max concurrency config values - if (config->maxconcurrentvisitors == 0 - && config->maxconcurrentvisitorsFixed == 0) + if (config.maxconcurrentvisitors == 0 + && config.maxconcurrentvisitorsFixed == 0) { throw config::InvalidConfigException( "Maximum concurrent visitor count cannot be 0."); } - else if (config->maxconcurrentvisitorsFixed == 0 - && config->maxconcurrentvisitorsVariable != 0) + else if (config.maxconcurrentvisitorsFixed == 0 + && config.maxconcurrentvisitorsVariable != 0) { throw config::InvalidConfigException( "Cannot specify 'variable' parameter for max concurrent " @@ -147,21 +143,21 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi uint32_t maxConcurrentVisitorsVariable; // Concurrency parameter fixed takes precedence over legacy maxconcurrent - if (config->maxconcurrentvisitorsFixed > 0) { - maxConcurrentVisitorsFixed = config->maxconcurrentvisitorsFixed; - maxConcurrentVisitorsVariable = config->maxconcurrentvisitorsVariable; + if (config.maxconcurrentvisitorsFixed > 0) { + maxConcurrentVisitorsFixed = config.maxconcurrentvisitorsFixed; + maxConcurrentVisitorsVariable = config.maxconcurrentvisitorsVariable; } else { - maxConcurrentVisitorsFixed = config->maxconcurrentvisitors; + maxConcurrentVisitorsFixed = config.maxconcurrentvisitors; maxConcurrentVisitorsVariable = 0; } bool liveUpdate = !_visitorThread.empty(); if (liveUpdate) { - if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) { + if (_visitorThread.size() != static_cast<uint32_t>(config.visitorthreads)) { LOG(warning, "Ignoring config change requesting %u visitor " "threads, still running %u. Restart storage to apply " "change.", - config->visitorthreads, + config.visitorthreads, (uint32_t) _visitorThread.size()); } @@ -174,18 +170,18 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi maxConcurrentVisitorsFixed, maxConcurrentVisitorsVariable); } - if (_maxVisitorQueueSize != static_cast<uint32_t>(config->maxvisitorqueuesize)) { + if (_maxVisitorQueueSize != static_cast<uint32_t>(config.maxvisitorqueuesize)) { LOG(info, "Altered max visitor queue size setting from %u to %u.", - _maxVisitorQueueSize, config->maxvisitorqueuesize); + _maxVisitorQueueSize, config.maxvisitorqueuesize); } } else { - if (config->visitorthreads == 0) { + if (config.visitorthreads == 0) { throw config::InvalidConfigException( "No visitor threads configured. If you don't want visitors " "to run, don't use visitormanager.", VESPA_STRLOC); } - _metrics->initThreads(config->visitorthreads); - for (int32_t i=0; i<config->visitorthreads; ++i) { + _metrics->initThreads(config.visitorthreads); + for (int32_t i=0; i<config.visitorthreads; ++i) { _visitorThread.emplace_back( // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory, @@ -195,9 +191,9 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi } _maxFixedConcurrentVisitors = maxConcurrentVisitorsFixed; _maxVariableConcurrentVisitors = maxConcurrentVisitorsVariable; - _maxVisitorQueueSize = config->maxvisitorqueuesize; + _maxVisitorQueueSize = config.maxvisitorqueuesize; - auto cmd = std::make_shared<PropagateVisitorConfig>(*config); + auto cmd = std::make_shared<PropagateVisitorConfig>(config); for (auto& thread : _visitorThread) { thread.first->processMessage(0, cmd); } diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 9fe906d4465..fefa2c218ab 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -44,10 +44,11 @@ class VisitorManager : public framework::Runnable, public StorageLink, public framework::HtmlStatusReporter, private VisitorMessageHandler, - private config::IFetcherCallback<vespa::config::content::core::StorVisitorConfig>, private framework::MetricUpdateHook { private: + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; + StorageComponentRegister& _componentRegister; VisitorMessageSessionFactory& _messageSessionFactory; std::vector<std::pair<std::shared_ptr<VisitorThread>, @@ -64,7 +65,6 @@ private: mutable std::mutex _visitorLock; std::condition_variable _visitorCond; uint64_t _visitorCounter; - std::unique_ptr<config::ConfigFetcher> _configFetcher; std::shared_ptr<VisitorMetrics> _metrics; uint32_t _maxFixedConcurrentVisitors; uint32_t _maxVariableConcurrentVisitors; @@ -82,7 +82,7 @@ private: bool _enforceQueueUse; VisitorFactory::Map _visitorFactories; public: - VisitorManager(const config::ConfigUri & configUri, + VisitorManager(const StorVisitorConfig& bootstrap_config, StorageComponentRegister&, VisitorMessageSessionFactory&, VisitorFactory::Map external = VisitorFactory::Map(), @@ -94,6 +94,8 @@ public: uint32_t getActiveVisitorCount() const; void setTimeBetweenTicks(uint32_t time); + void on_configure(const vespa::config::content::core::StorVisitorConfig&); + void setMaxConcurrentVisitors(uint32_t count) { // Used in unit testing _maxFixedConcurrentVisitors = count; _maxVariableConcurrentVisitors = 0; @@ -122,7 +124,6 @@ public: private: using MonitorGuard = std::unique_lock<std::mutex>; - void configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig>) override; void run(framework::ThreadHandle&) override; /** diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp index 0c23bb1a409..f43ec574806 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp @@ -35,6 +35,7 @@ ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri) : Process(configUri), _externalVisitors(), _persistence_cfg_handle(), + _visitor_cfg_handle(), _node(), _storage_chain_builder(), _context(std::make_unique<framework::defaultimplementation::RealClock>(), @@ -55,6 +56,7 @@ void ServiceLayerProcess::setupConfig(vespalib::duration subscribe_timeout) { _persistence_cfg_handle = _configSubscriber.subscribe<PersistenceConfig>(_configUri.getConfigId(), subscribe_timeout); + _visitor_cfg_handle = _configSubscriber.subscribe<StorVisitorConfig>(_configUri.getConfigId(), subscribe_timeout); // We reuse the StorServerConfig subscription from the parent Process Process::setupConfig(subscribe_timeout); } @@ -69,6 +71,9 @@ ServiceLayerProcess::updateConfig() if (_persistence_cfg_handle->isChanged()) { _node->on_configure(*_persistence_cfg_handle->getConfig()); } + if (_visitor_cfg_handle->isChanged()) { + _node->on_configure(*_visitor_cfg_handle->getConfig()); + } } bool @@ -93,6 +98,7 @@ ServiceLayerProcess::createNode() ServiceLayerNode::ServiceLayerBootstrapConfigs sbc; sbc.storage_bootstrap_configs = std::move(bc); sbc.persistence_cfg = _persistence_cfg_handle->getConfig(); + sbc.visitor_cfg = _visitor_cfg_handle->getConfig(); _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors); if (_storage_chain_builder) { diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h index 2579b3df199..d91c66be266 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -3,8 +3,9 @@ #include "process.h" #include <vespa/config-persistence.h> -#include <vespa/storage/storageserver/servicelayernodecontext.h> #include <vespa/storage/common/visitorfactory.h> +#include <vespa/storage/storageserver/servicelayernodecontext.h> +#include <vespa/storage/visiting/config-stor-visitor.h> namespace config { class ConfigUri; } @@ -20,8 +21,10 @@ protected: VisitorFactory::Map _externalVisitors; private: using PersistenceConfig = vespa::config::content::PersistenceConfig; + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; std::unique_ptr<config::ConfigHandle<PersistenceConfig>> _persistence_cfg_handle; + std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle; std::unique_ptr<ServiceLayerNode> _node; std::unique_ptr<IStorageChainBuilder> _storage_chain_builder; |