diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-16 11:41:12 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-10-16 12:00:14 +0000 |
commit | e3d1f0f319f7ebd946e4c578e0f73f6c265f5a83 (patch) | |
tree | 7e2ed14f1b137a436e2db3b03d3b9281fe57d0c0 /storage/src | |
parent | e7d54515ca2a5eceea0bf17c5b96b969aea8c53a (diff) |
Wire `CommunicationManager` config from its owner rather than self-subscribing
This moves the responsibility for bootstrapping and updating config
for the `CommunicationManager` component to its owner. By doing this,
a dedicated `ConfigFetcher` can be removed. Since this is a
component used by both the distributor and storage nodes, this
reduces total thread count by 2 on a host.
Diffstat (limited to 'storage/src')
7 files changed, 132 insertions, 103 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index e86c822e83c..70b8c40722c 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -1,31 +1,41 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/storageserver/communicationmanager.h> - -#include <vespa/messagebus/testlib/slobrok.h> -#include <vespa/messagebus/rpcmessagebus.h> -#include <vespa/storageapi/message/persistence.h> -#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> -#include <vespa/storage/persistence/messages.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> -#include <vespa/document/test/make_document_bucket.h> +#include <tests/common/teststorageapp.h> +#include <vespa/config/helper/configgetter.hpp> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/test/make_document_bucket.h> #include <vespa/documentapi/messagebus/messages/getdocumentmessage.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> #include <vespa/documentapi/messagebus/messages/getdocumentreply.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/testlib/slobrok.h> +#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> +#include <vespa/storage/persistence/messages.h> +#include <vespa/storage/storageserver/communicationmanager.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/vespalib/util/stringfmt.h> #include <thread> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> using document::test::makeDocumentBucket; using namespace ::testing; namespace storage { -vespalib::string _Storage("storage"); +vespalib::string _storage("storage"); + +using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; + +namespace { + +std::unique_ptr<CommunicationManagerConfig> config_from(const config::ConfigUri& cfg_uri) { + return config::ConfigGetter<CommunicationManagerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); +} + +} struct CommunicationManagerTest : Test { @@ -33,13 +43,11 @@ struct CommunicationManagerTest : Test { void doTestConfigPropagation(bool isContentNode); - std::shared_ptr<api::StorageCommand> createDummyCommand( - api::StorageMessage::Priority priority) - { + static std::shared_ptr<api::StorageCommand> createDummyCommand(api::StorageMessage::Priority priority) { auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)), document::DocumentId("id:ns:mytype::mydoc"), document::AllFields::NAME); - cmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); cmd->setPriority(priority); return cmd; } @@ -77,19 +85,20 @@ TEST_F(CommunicationManagerTest, simple) { TestServiceLayerApp storNode(storConfig.getConfigId()); TestDistributorApp distNode(distConfig.getConfigId()); - CommunicationManager distributor(distNode.getComponentRegister(), - config::ConfigUri(distConfig.getConfigId())); - CommunicationManager storage(storNode.getComponentRegister(), - config::ConfigUri(storConfig.getConfigId())); - DummyStorageLink *distributorLink = new DummyStorageLink(); - DummyStorageLink *storageLink = new DummyStorageLink(); + auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId()); + auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId()); + + CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from(dist_cfg_uri)); + CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, *config_from(stor_cfg_uri)); + auto* distributorLink = new DummyStorageLink(); + auto* storageLink = new DummyStorageLink(); distributor.push_back(std::unique_ptr<StorageLink>(distributorLink)); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); distributor.open(); storage.open(); - auto stor_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1); - auto distr_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::DISTRIBUTOR, 1); + auto stor_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1); + auto distr_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::DISTRIBUTOR, 1); // It is undefined when the logical nodes will be visible in each others Slobrok // mirrors, so explicitly wait until mutual visibility is ensured. Failure to do this // might cause the below message to be immediately bounced due to failing to map the @@ -136,9 +145,9 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) node = std::make_unique<TestDistributorApp>(config.getConfigId()); } - CommunicationManager commMgr(node->getComponentRegister(), - config::ConfigUri(config.getConfigId())); - DummyStorageLink *storageLink = new DummyStorageLink(); + auto cfg_uri = config::ConfigUri(config.getConfigId()); + CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + auto* storageLink = new DummyStorageLink(); commMgr.push_back(std::unique_ptr<StorageLink>(storageLink)); commMgr.open(); @@ -153,13 +162,12 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) } // Test live reconfig of limits. - using ConfigBuilder - = vespa::config::content::core::StorCommunicationmanagerConfigBuilder; + using ConfigBuilder = vespa::config::content::core::StorCommunicationmanagerConfigBuilder; auto liveCfg = std::make_unique<ConfigBuilder>(); liveCfg->mbusContentNodeMaxPendingCount = 777777; liveCfg->mbusDistributorNodeMaxPendingCount = 999999; - commMgr.configure(std::move(liveCfg)); + commMgr.on_configure(*liveCfg); if (isContentNode) { EXPECT_EQ(777777, mbus.getMaxPendingCount()); } else { @@ -182,9 +190,9 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { addSlobrokConfig(storConfig, slobrok); TestServiceLayerApp storNode(storConfig.getConfigId()); - CommunicationManager storage(storNode.getComponentRegister(), - config::ConfigUri(storConfig.getConfigId())); - DummyStorageLink *storageLink = new DummyStorageLink(); + auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); + CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); storage.open(); @@ -215,9 +223,9 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { addSlobrokConfig(storConfig, slobrok); TestServiceLayerApp storNode(storConfig.getConfigId()); - CommunicationManager storage(storNode.getComponentRegister(), - config::ConfigUri(storConfig.getConfigId())); - DummyStorageLink *storageLink = new DummyStorageLink(); + auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); + CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); storage.open(); @@ -256,8 +264,8 @@ struct CommunicationManagerFixture { addSlobrokConfig(stor_config, slobrok); node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId()); - comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), - config::ConfigUri(stor_config.getConfigId())); + auto cfg_uri = config::ConfigUri(stor_config.getConfigId()); + comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); bottom_link = new DummyStorageLink(); comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link)); comm_mgr->open(); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 5bbc5b2a26d..c126ec01dc6 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -216,7 +216,9 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } -CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) +CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, + const config::ConfigUri& configUri, + const CommunicationManagerConfig& bootstrap_config) : StorageLink("Communication manager", MsgDownOnFlush::Allowed, MsgUpOnClosed::Disallowed), _component(compReg, "communicationmanager"), _metrics(), @@ -224,10 +226,11 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _storage_api_rpc_service(), // (ditto) _cc_rpc_service(), // (ditto) _eventQueue(), + _bootstrap_config(std::make_unique<CommunicationManagerConfig>(bootstrap_config)), _mbus(), _configUri(configUri), _closed(false), - _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), + _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), // TODO wire config from outside _thread() { _component.registerMetricUpdateHook(*this, 5s); @@ -237,9 +240,13 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co void CommunicationManager::onOpen() { - _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); - _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this); - _configFetcher->start(); + // We have to hold on to the bootstrap config until we reach the open-phase, as the + // actual RPC/mbus endpoints are started at the first config edge. + // Note: this is called as part of synchronous node initialization, which explicitly + // prevents any concurrent reconfiguration prior to opening all storage chain components, + // i.e. there's no risk of on_configure() being called _prior_ to us getting here. + on_configure(*_bootstrap_config); + _bootstrap_config.reset(); _thread = _component.startThread(*this, 60s); if (_shared_rpc_resources) { @@ -275,9 +282,6 @@ CommunicationManager::~CommunicationManager() void CommunicationManager::onClose() { - // Avoid getting config during shutdown - _configFetcher.reset(); - _closed.store(true, std::memory_order_seq_cst); if (_cc_rpc_service) { _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on @@ -352,20 +356,20 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig } void -CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) +CommunicationManager::on_configure(const CommunicationManagerConfig& config) { // Only allow dynamic (live) reconfiguration of message bus limits. if (_mbus) { - configureMessageBusLimits(*config); - if (_mbus->getRPCNetwork().getPort() != config->mbusport) { + configureMessageBusLimits(config); + if (_mbus->getRPCNetwork().getPort() != config.mbusport) { auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.", - _mbus->getRPCNetwork().getPort(), config->mbusport); + _mbus->getRPCNetwork().getPort(), config.mbusport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } - if (_shared_rpc_resources->listen_port() != config->rpcport) { + if (_shared_rpc_resources->listen_port() != config.rpcport) { auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.", - _shared_rpc_resources->listen_port(), config->rpcport); + _shared_rpc_resources->listen_port(), config.rpcport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } @@ -375,25 +379,25 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf if (!_configUri.empty()) { LOG(debug, "setting up slobrok config from id: '%s", _configUri.getConfigId().c_str()); mbus::RPCNetworkParams params(_configUri); - params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); - params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); - params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets)); - params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup)); - params.setTcpNoDelay(config->mbus.tcpNoDelay); + params.setConnectionExpireSecs(config.mbus.rpctargetcache.ttl); + params.setNumNetworkThreads(std::max(1, config.mbus.numNetworkThreads)); + params.setNumRpcTargets(std::max(1, config.mbus.numRpcTargets)); + params.events_before_wakeup(std::max(1, config.mbus.eventsBeforeWakeup)); + params.setTcpNoDelay(config.mbus.tcpNoDelay); params.required_capabilities(vespalib::net::tls::CapabilitySet::of({ vespalib::net::tls::Capability::content_document_api() })); params.setIdentity(mbus::Identity(_component.getIdentity())); - if (config->mbusport != -1) { - params.setListenPort(config->mbusport); + if (config.mbusport != -1) { + params.setListenPort(config.mbusport); } using CompressionConfig = vespalib::compression::CompressionConfig; CompressionConfig::Type compressionType = CompressionConfig::toType( - CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str()); - params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level, - 90, config->mbus.compress.limit)); + CommunicationManagerConfig::Mbus::Compress::getTypeName(config.mbus.compress.type).c_str()); + params.setCompressionConfig(CompressionConfig(compressionType, config.mbus.compress.level, + 90, config.mbus.compress.limit)); // Configure messagebus here as we for legacy reasons have // config here. @@ -403,16 +407,16 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf params, _configUri); - configureMessageBusLimits(*config); + configureMessageBusLimits(config); } _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo); - _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, - config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup); + _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config.rpcport, + config.rpc.numNetworkThreads, config.rpc.eventsBeforeWakeup); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); rpc::StorageApiRpcService::Params rpc_params; - rpc_params.compression_config = convert_to_rpc_compression_config(*config); - rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode; + rpc_params.compression_config = convert_to_rpc_compression_config(config); + rpc_params.num_rpc_targets_per_node = config.rpc.numTargetsPerNode; _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 3c986c59c5e..7a910336b13 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -1,11 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class CommunicationManager - * @ingroup storageserver - * - * @brief Class used for sending messages over the network. - * - * @version $Id$ + * Class used for sending messages over the network. */ #pragma once @@ -65,7 +60,6 @@ public: class CommunicationManager final : public StorageLink, public framework::Runnable, - private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>, public mbus::IMessageHandler, public mbus::IReplyHandler, private framework::MetricUpdateHook, @@ -80,8 +74,6 @@ private: std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service; std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider; Queue _eventQueue; - // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? - std::unique_ptr<config::ConfigFetcher> _configFetcher; using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>; using EarlierProtocols = std::vector<EarlierProtocol>; std::mutex _earlierGenerationsLock; @@ -97,7 +89,6 @@ private: using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); - void configure(std::unique_ptr<CommunicationManagerConfig> config) override; void receiveStorageReply(const std::shared_ptr<api::StorageReply>&); void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg, const vespalib::string& error_message); @@ -106,6 +97,7 @@ private: static const uint64_t FORWARDED_MESSAGE = 0; + std::unique_ptr<CommunicationManagerConfig> _bootstrap_config; std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; @@ -127,9 +119,12 @@ public: CommunicationManager(const CommunicationManager&) = delete; CommunicationManager& operator=(const CommunicationManager&) = delete; CommunicationManager(StorageComponentRegister& compReg, - const config::ConfigUri & configUri); + const config::ConfigUri& configUri, + const CommunicationManagerConfig& bootstrap_config); ~CommunicationManager() override; + void on_configure(const CommunicationManagerConfig& config); + // MessageDispatcher overrides void dispatch_sync(std::shared_ptr<api::StorageMessage> msg) override; void dispatch_async(std::shared_ptr<api::StorageMessage> msg) override; diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 7d2a69a2200..cbe1b64169b 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -32,7 +32,7 @@ DistributorNode::DistributorNode( _timestamp_second_counter(0), _intra_second_pseudo_usec_counter(0), _num_distributor_stripes(num_distributor_stripes), - _retrievedCommunicationManager(std::move(communicationManager)) + _retrievedCommunicationManager(std::move(communicationManager)) // may be nullptr { if (storage_chain_builder) { set_storage_chain_builder(std::move(storage_chain_builder)); @@ -88,7 +88,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder) if (_retrievedCommunicationManager) { builder.add(std::move(_retrievedCommunicationManager)); } else { - auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri); + auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, *_comm_mgr_config); _communicationManager = communication_manager.get(); builder.add(std::move(communication_manager)); } diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index ba4d8a96120..3e990ba8cf3 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -88,6 +88,7 @@ void ServiceLayerNode::subscribeToConfigs() { StorageNode::subscribeToConfigs(); + // TODO consolidate this with existing config fetcher in StorageNode parent... _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext())); } @@ -162,7 +163,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) { ServiceLayerComponentRegister& compReg(_context.getComponentRegister()); - auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri); + auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, *_comm_mgr_config); _communicationManager = communication_manager.get(); builder.add(std::move(communication_manager)); builder.add(std::make_unique<Bouncer>(compReg, _configUri)); diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 452a94496af..4e07c1f6b02 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -112,18 +112,21 @@ void StorageNode::subscribeToConfigs() { _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); + _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<CommunicationManagerConfig>(_configUri.getConfigId(), this); _configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this); _configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this); _configFetcher->start(); + // All the below config instances were synchronously populated as part of start()ing the config fetcher std::lock_guard configLockGuard(_configLock); - _serverConfig = std::move(_newServerConfig); - _clusterConfig = std::move(_newClusterConfig); - _distributionConfig = std::move(_newDistributionConfig); _bucketSpacesConfig = std::move(_newBucketSpacesConfig); + _clusterConfig = std::move(_newClusterConfig); + _comm_mgr_config = std::move(_new_comm_mgr_config); + _distributionConfig = std::move(_newDistributionConfig); + _serverConfig = std::move(_newServerConfig); } void @@ -324,6 +327,10 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); } + if (_new_comm_mgr_config) { + _comm_mgr_config = std::move(_new_comm_mgr_config); + _communicationManager->on_configure(*_comm_mgr_config); + } } void @@ -504,6 +511,19 @@ StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { } } +void +StorageNode::configure(std::unique_ptr<CommunicationManagerConfig > config) { + log_config_received(*config); + { + std::lock_guard config_lock_guard(_configLock); + _new_comm_mgr_config = std::move(config); + } + if (_comm_mgr_config) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } +} + bool StorageNode::attemptedStopped() const { diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 9538c2e1606..70e9101a597 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -1,9 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StorageNode - * @ingroup storageserver - * - * @brief Main storage server class. + * Main storage server class. * * This class sets up the entire storage server. * @@ -12,13 +9,14 @@ #pragma once +#include <vespa/config-bucketspaces.h> #include <vespa/config-stor-distribution.h> #include <vespa/config-upgrading.h> #include <vespa/config/helper/ifetchercallback.h> #include <vespa/config/subscription/configuri.h> #include <vespa/document/config/config-documenttypes.h> #include <vespa/storage/common/doneinitializehandler.h> -#include <vespa/config-bucketspaces.h> +#include <vespa/storage/config/config-stor-communicationmanager.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> @@ -55,6 +53,7 @@ class StorageNode : private config::IFetcherCallback<vespa::config::content::cor private config::IFetcherCallback<vespa::config::content::StorDistributionConfig>, private config::IFetcherCallback<vespa::config::content::UpgradingConfig>, private config::IFetcherCallback<vespa::config::content::core::BucketspacesConfig>, + private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>, private framework::MetricUpdateHook, private DoneInitializeHandler, private framework::defaultimplementation::ShutdownListener @@ -64,12 +63,8 @@ public: StorageNode(const StorageNode &) = delete; StorageNode & operator = (const StorageNode &) = delete; - /** - * @param excludeStorageChain With this option set, no chain will be set - * up. This can be useful in unit testing if you need a storage server - * instance, but you want to have full control over the components yourself. - */ - StorageNode(const config::ConfigUri & configUri, + + StorageNode(const config::ConfigUri& configUri, StorageNodeContext& context, ApplicationGenerationFetcher& generationFetcher, std::unique_ptr<HostInfo> hostInfo, @@ -97,10 +92,11 @@ public: StorageLink* getChain() { return _chain.get(); } virtual void initializeStatusWebServer(); protected: - using StorServerConfig = vespa::config::content::core::StorServerConfig; - using UpgradingConfig = vespa::config::content::UpgradingConfig; - using StorDistributionConfig = vespa::config::content::StorDistributionConfig; - using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; + using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; + using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; + using StorDistributionConfig = vespa::config::content::StorDistributionConfig; + using StorServerConfig = vespa::config::content::core::StorServerConfig; + using UpgradingConfig = vespa::config::content::UpgradingConfig; private: bool _singleThreadedDebugMode; // Subscriptions to config @@ -137,12 +133,14 @@ private: virtual void configure(std::unique_ptr<document::config::DocumenttypesConfig> config, bool hasChanged, int64_t generation); void configure(std::unique_ptr<BucketspacesConfig>) override; + void configure(std::unique_ptr<CommunicationManagerConfig> config) override; protected: // Lock taken while doing configuration of the server. std::mutex _configLock; std::mutex _initial_config_mutex; using InitialGuard = std::lock_guard<std::mutex>; + // Current running config. Kept, such that we can see what has been // changed in live config updates. std::unique_ptr<StorServerConfig> _serverConfig; @@ -150,12 +148,15 @@ protected: std::unique_ptr<StorDistributionConfig> _distributionConfig; std::unique_ptr<document::config::DocumenttypesConfig> _doctypesConfig; std::unique_ptr<BucketspacesConfig> _bucketSpacesConfig; + std::unique_ptr<CommunicationManagerConfig> _comm_mgr_config; + // New configs gotten that has yet to have been handled std::unique_ptr<StorServerConfig> _newServerConfig; std::unique_ptr<UpgradingConfig> _newClusterConfig; std::unique_ptr<StorDistributionConfig> _newDistributionConfig; std::unique_ptr<document::config::DocumenttypesConfig> _newDoctypesConfig; std::unique_ptr<BucketspacesConfig> _newBucketSpacesConfig; + std::unique_ptr<CommunicationManagerConfig> _new_comm_mgr_config; std::unique_ptr<StorageComponent> _component; std::unique_ptr<NodeIdentity> _node_identity; config::ConfigUri _configUri; |