summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-10-16 11:41:12 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-10-16 12:00:14 +0000
commite3d1f0f319f7ebd946e4c578e0f73f6c265f5a83 (patch)
tree7e2ed14f1b137a436e2db3b03d3b9281fe57d0c0
parente7d54515ca2a5eceea0bf17c5b96b969aea8c53a (diff)
Wire `CommunicationManager` config from its owner rather than self-subscribing
This moves the responsibility for bootstrapping and updating config for the `CommunicationManager` component to its owner. By doing this, a dedicated `ConfigFetcher` can be removed. Since this is a component used by both the distributor and storage nodes, this reduces total thread count by 2 on a host.
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp88
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp62
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h17
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp4
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp3
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp30
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h31
7 files changed, 132 insertions, 103 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index e86c822e83c..70b8c40722c 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -1,31 +1,41 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/storageserver/communicationmanager.h>
-
-#include <vespa/messagebus/testlib/slobrok.h>
-#include <vespa/messagebus/rpcmessagebus.h>
-#include <vespa/storageapi/message/persistence.h>
-#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
-#include <vespa/storage/persistence/messages.h>
-#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/testhelper.h>
-#include <vespa/document/test/make_document_bucket.h>
+#include <tests/common/teststorageapp.h>
+#include <vespa/config/helper/configgetter.hpp>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/test/make_document_bucket.h>
#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/testlib/slobrok.h>
+#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
+#include <vespa/storage/persistence/messages.h>
+#include <vespa/storage/storageserver/communicationmanager.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/vespalib/util/stringfmt.h>
#include <thread>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
using document::test::makeDocumentBucket;
using namespace ::testing;
namespace storage {
-vespalib::string _Storage("storage");
+vespalib::string _storage("storage");
+
+using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
+
+namespace {
+
+std::unique_ptr<CommunicationManagerConfig> config_from(const config::ConfigUri& cfg_uri) {
+ return config::ConfigGetter<CommunicationManagerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
+}
+
+}
struct CommunicationManagerTest : Test {
@@ -33,13 +43,11 @@ struct CommunicationManagerTest : Test {
void doTestConfigPropagation(bool isContentNode);
- std::shared_ptr<api::StorageCommand> createDummyCommand(
- api::StorageMessage::Priority priority)
- {
+ static std::shared_ptr<api::StorageCommand> createDummyCommand(api::StorageMessage::Priority priority) {
auto cmd = std::make_shared<api::GetCommand>(makeDocumentBucket(document::BucketId(0)),
document::DocumentId("id:ns:mytype::mydoc"),
document::AllFields::NAME);
- cmd->setAddress(api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
cmd->setPriority(priority);
return cmd;
}
@@ -77,19 +85,20 @@ TEST_F(CommunicationManagerTest, simple) {
TestServiceLayerApp storNode(storConfig.getConfigId());
TestDistributorApp distNode(distConfig.getConfigId());
- CommunicationManager distributor(distNode.getComponentRegister(),
- config::ConfigUri(distConfig.getConfigId()));
- CommunicationManager storage(storNode.getComponentRegister(),
- config::ConfigUri(storConfig.getConfigId()));
- DummyStorageLink *distributorLink = new DummyStorageLink();
- DummyStorageLink *storageLink = new DummyStorageLink();
+ auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId());
+ auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId());
+
+ CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from(dist_cfg_uri));
+ CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, *config_from(stor_cfg_uri));
+ auto* distributorLink = new DummyStorageLink();
+ auto* storageLink = new DummyStorageLink();
distributor.push_back(std::unique_ptr<StorageLink>(distributorLink));
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
distributor.open();
storage.open();
- auto stor_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1);
- auto distr_addr = api::StorageMessageAddress::create(&_Storage, lib::NodeType::DISTRIBUTOR, 1);
+ auto stor_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1);
+ auto distr_addr = api::StorageMessageAddress::create(&_storage, lib::NodeType::DISTRIBUTOR, 1);
// It is undefined when the logical nodes will be visible in each others Slobrok
// mirrors, so explicitly wait until mutual visibility is ensured. Failure to do this
// might cause the below message to be immediately bounced due to failing to map the
@@ -136,9 +145,9 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode)
node = std::make_unique<TestDistributorApp>(config.getConfigId());
}
- CommunicationManager commMgr(node->getComponentRegister(),
- config::ConfigUri(config.getConfigId()));
- DummyStorageLink *storageLink = new DummyStorageLink();
+ auto cfg_uri = config::ConfigUri(config.getConfigId());
+ CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ auto* storageLink = new DummyStorageLink();
commMgr.push_back(std::unique_ptr<StorageLink>(storageLink));
commMgr.open();
@@ -153,13 +162,12 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode)
}
// Test live reconfig of limits.
- using ConfigBuilder
- = vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
+ using ConfigBuilder = vespa::config::content::core::StorCommunicationmanagerConfigBuilder;
auto liveCfg = std::make_unique<ConfigBuilder>();
liveCfg->mbusContentNodeMaxPendingCount = 777777;
liveCfg->mbusDistributorNodeMaxPendingCount = 999999;
- commMgr.configure(std::move(liveCfg));
+ commMgr.on_configure(*liveCfg);
if (isContentNode) {
EXPECT_EQ(777777, mbus.getMaxPendingCount());
} else {
@@ -182,9 +190,9 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
addSlobrokConfig(storConfig, slobrok);
TestServiceLayerApp storNode(storConfig.getConfigId());
- CommunicationManager storage(storNode.getComponentRegister(),
- config::ConfigUri(storConfig.getConfigId()));
- DummyStorageLink *storageLink = new DummyStorageLink();
+ auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
+ CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ auto* storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
storage.open();
@@ -215,9 +223,9 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
addSlobrokConfig(storConfig, slobrok);
TestServiceLayerApp storNode(storConfig.getConfigId());
- CommunicationManager storage(storNode.getComponentRegister(),
- config::ConfigUri(storConfig.getConfigId()));
- DummyStorageLink *storageLink = new DummyStorageLink();
+ auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
+ CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ auto* storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
storage.open();
@@ -256,8 +264,8 @@ struct CommunicationManagerFixture {
addSlobrokConfig(stor_config, slobrok);
node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId());
- comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(),
- config::ConfigUri(stor_config.getConfigId()));
+ auto cfg_uri = config::ConfigUri(stor_config.getConfigId());
+ comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri));
bottom_link = new DummyStorageLink();
comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link));
comm_mgr->open();
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 5bbc5b2a26d..c126ec01dc6 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -216,7 +216,9 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun
}
-CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
+CommunicationManager::CommunicationManager(StorageComponentRegister& compReg,
+ const config::ConfigUri& configUri,
+ const CommunicationManagerConfig& bootstrap_config)
: StorageLink("Communication manager", MsgDownOnFlush::Allowed, MsgUpOnClosed::Disallowed),
_component(compReg, "communicationmanager"),
_metrics(),
@@ -224,10 +226,11 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
_storage_api_rpc_service(), // (ditto)
_cc_rpc_service(), // (ditto)
_eventQueue(),
+ _bootstrap_config(std::make_unique<CommunicationManagerConfig>(bootstrap_config)),
_mbus(),
_configUri(configUri),
_closed(false),
- _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()),
+ _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), // TODO wire config from outside
_thread()
{
_component.registerMetricUpdateHook(*this, 5s);
@@ -237,9 +240,13 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co
void
CommunicationManager::onOpen()
{
- _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext());
- _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this);
- _configFetcher->start();
+ // We have to hold on to the bootstrap config until we reach the open-phase, as the
+ // actual RPC/mbus endpoints are started at the first config edge.
+ // Note: this is called as part of synchronous node initialization, which explicitly
+ // prevents any concurrent reconfiguration prior to opening all storage chain components,
+ // i.e. there's no risk of on_configure() being called _prior_ to us getting here.
+ on_configure(*_bootstrap_config);
+ _bootstrap_config.reset();
_thread = _component.startThread(*this, 60s);
if (_shared_rpc_resources) {
@@ -275,9 +282,6 @@ CommunicationManager::~CommunicationManager()
void
CommunicationManager::onClose()
{
- // Avoid getting config during shutdown
- _configFetcher.reset();
-
_closed.store(true, std::memory_order_seq_cst);
if (_cc_rpc_service) {
_cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on
@@ -352,20 +356,20 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig
}
void
-CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config)
+CommunicationManager::on_configure(const CommunicationManagerConfig& config)
{
// Only allow dynamic (live) reconfiguration of message bus limits.
if (_mbus) {
- configureMessageBusLimits(*config);
- if (_mbus->getRPCNetwork().getPort() != config->mbusport) {
+ configureMessageBusLimits(config);
+ if (_mbus->getRPCNetwork().getPort() != config.mbusport) {
auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.",
- _mbus->getRPCNetwork().getPort(), config->mbusport);
+ _mbus->getRPCNetwork().getPort(), config.mbusport);
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
- if (_shared_rpc_resources->listen_port() != config->rpcport) {
+ if (_shared_rpc_resources->listen_port() != config.rpcport) {
auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.",
- _shared_rpc_resources->listen_port(), config->rpcport);
+ _shared_rpc_resources->listen_port(), config.rpcport);
LOG(warning, "%s", m.c_str());
_component.requestShutdown(m);
}
@@ -375,25 +379,25 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf
if (!_configUri.empty()) {
LOG(debug, "setting up slobrok config from id: '%s", _configUri.getConfigId().c_str());
mbus::RPCNetworkParams params(_configUri);
- params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
- params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
- params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets));
- params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup));
- params.setTcpNoDelay(config->mbus.tcpNoDelay);
+ params.setConnectionExpireSecs(config.mbus.rpctargetcache.ttl);
+ params.setNumNetworkThreads(std::max(1, config.mbus.numNetworkThreads));
+ params.setNumRpcTargets(std::max(1, config.mbus.numRpcTargets));
+ params.events_before_wakeup(std::max(1, config.mbus.eventsBeforeWakeup));
+ params.setTcpNoDelay(config.mbus.tcpNoDelay);
params.required_capabilities(vespalib::net::tls::CapabilitySet::of({
vespalib::net::tls::Capability::content_document_api()
}));
params.setIdentity(mbus::Identity(_component.getIdentity()));
- if (config->mbusport != -1) {
- params.setListenPort(config->mbusport);
+ if (config.mbusport != -1) {
+ params.setListenPort(config.mbusport);
}
using CompressionConfig = vespalib::compression::CompressionConfig;
CompressionConfig::Type compressionType = CompressionConfig::toType(
- CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
- params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
- 90, config->mbus.compress.limit));
+ CommunicationManagerConfig::Mbus::Compress::getTypeName(config.mbus.compress.type).c_str());
+ params.setCompressionConfig(CompressionConfig(compressionType, config.mbus.compress.level,
+ 90, config.mbus.compress.limit));
// Configure messagebus here as we for legacy reasons have
// config here.
@@ -403,16 +407,16 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf
params,
_configUri);
- configureMessageBusLimits(*config);
+ configureMessageBusLimits(config);
}
_message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo);
- _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport,
- config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup);
+ _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config.rpcport,
+ config.rpc.numNetworkThreads, config.rpc.eventsBeforeWakeup);
_cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources);
rpc::StorageApiRpcService::Params rpc_params;
- rpc_params.compression_config = convert_to_rpc_compression_config(*config);
- rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode;
+ rpc_params.compression_config = convert_to_rpc_compression_config(config);
+ rpc_params.num_rpc_targets_per_node = config.rpc.numTargetsPerNode;
_storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>(
*this, *_shared_rpc_resources, *_message_codec_provider, rpc_params);
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index 3c986c59c5e..7a910336b13 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -1,11 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class CommunicationManager
- * @ingroup storageserver
- *
- * @brief Class used for sending messages over the network.
- *
- * @version $Id$
+ * Class used for sending messages over the network.
*/
#pragma once
@@ -65,7 +60,6 @@ public:
class CommunicationManager final
: public StorageLink,
public framework::Runnable,
- private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
public mbus::IMessageHandler,
public mbus::IReplyHandler,
private framework::MetricUpdateHook,
@@ -80,8 +74,6 @@ private:
std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service;
std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider;
Queue _eventQueue;
- // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>;
using EarlierProtocols = std::vector<EarlierProtocol>;
std::mutex _earlierGenerationsLock;
@@ -97,7 +89,6 @@ private:
using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
void configureMessageBusLimits(const CommunicationManagerConfig& cfg);
- void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
void receiveStorageReply(const std::shared_ptr<api::StorageReply>&);
void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg,
const vespalib::string& error_message);
@@ -106,6 +97,7 @@ private:
static const uint64_t FORWARDED_MESSAGE = 0;
+ std::unique_ptr<CommunicationManagerConfig> _bootstrap_config;
std::unique_ptr<mbus::RPCMessageBus> _mbus;
std::unique_ptr<mbus::DestinationSession> _messageBusSession;
std::unique_ptr<mbus::SourceSession> _sourceSession;
@@ -127,9 +119,12 @@ public:
CommunicationManager(const CommunicationManager&) = delete;
CommunicationManager& operator=(const CommunicationManager&) = delete;
CommunicationManager(StorageComponentRegister& compReg,
- const config::ConfigUri & configUri);
+ const config::ConfigUri& configUri,
+ const CommunicationManagerConfig& bootstrap_config);
~CommunicationManager() override;
+ void on_configure(const CommunicationManagerConfig& config);
+
// MessageDispatcher overrides
void dispatch_sync(std::shared_ptr<api::StorageMessage> msg) override;
void dispatch_async(std::shared_ptr<api::StorageMessage> msg) override;
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index 7d2a69a2200..cbe1b64169b 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -32,7 +32,7 @@ DistributorNode::DistributorNode(
_timestamp_second_counter(0),
_intra_second_pseudo_usec_counter(0),
_num_distributor_stripes(num_distributor_stripes),
- _retrievedCommunicationManager(std::move(communicationManager))
+ _retrievedCommunicationManager(std::move(communicationManager)) // may be nullptr
{
if (storage_chain_builder) {
set_storage_chain_builder(std::move(storage_chain_builder));
@@ -88,7 +88,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
if (_retrievedCommunicationManager) {
builder.add(std::move(_retrievedCommunicationManager));
} else {
- auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri);
+ auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, *_comm_mgr_config);
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
}
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index ba4d8a96120..3e990ba8cf3 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -88,6 +88,7 @@ void
ServiceLayerNode::subscribeToConfigs()
{
StorageNode::subscribeToConfigs();
+ // TODO consolidate this with existing config fetcher in StorageNode parent...
_configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
}
@@ -162,7 +163,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
{
ServiceLayerComponentRegister& compReg(_context.getComponentRegister());
- auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri);
+ auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, *_comm_mgr_config);
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
builder.add(std::make_unique<Bouncer>(compReg, _configUri));
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 452a94496af..4e07c1f6b02 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -112,18 +112,21 @@ void
StorageNode::subscribeToConfigs()
{
_configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext());
+ _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<CommunicationManagerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this);
_configFetcher->start();
+ // All the below config instances were synchronously populated as part of start()ing the config fetcher
std::lock_guard configLockGuard(_configLock);
- _serverConfig = std::move(_newServerConfig);
- _clusterConfig = std::move(_newClusterConfig);
- _distributionConfig = std::move(_newDistributionConfig);
_bucketSpacesConfig = std::move(_newBucketSpacesConfig);
+ _clusterConfig = std::move(_newClusterConfig);
+ _comm_mgr_config = std::move(_new_comm_mgr_config);
+ _distributionConfig = std::move(_newDistributionConfig);
+ _serverConfig = std::move(_newServerConfig);
}
void
@@ -324,6 +327,10 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
_context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
_communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
}
+ if (_new_comm_mgr_config) {
+ _comm_mgr_config = std::move(_new_comm_mgr_config);
+ _communicationManager->on_configure(*_comm_mgr_config);
+ }
}
void
@@ -504,6 +511,19 @@ StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) {
}
}
+void
+StorageNode::configure(std::unique_ptr<CommunicationManagerConfig > config) {
+ log_config_received(*config);
+ {
+ std::lock_guard config_lock_guard(_configLock);
+ _new_comm_mgr_config = std::move(config);
+ }
+ if (_comm_mgr_config) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
+}
+
bool
StorageNode::attemptedStopped() const
{
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 9538c2e1606..70e9101a597 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -1,9 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StorageNode
- * @ingroup storageserver
- *
- * @brief Main storage server class.
+ * Main storage server class.
*
* This class sets up the entire storage server.
*
@@ -12,13 +9,14 @@
#pragma once
+#include <vespa/config-bucketspaces.h>
#include <vespa/config-stor-distribution.h>
#include <vespa/config-upgrading.h>
#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/storage/common/doneinitializehandler.h>
-#include <vespa/config-bucketspaces.h>
+#include <vespa/storage/config/config-stor-communicationmanager.h>
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageutil/resumeguard.h>
#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
@@ -55,6 +53,7 @@ class StorageNode : private config::IFetcherCallback<vespa::config::content::cor
private config::IFetcherCallback<vespa::config::content::StorDistributionConfig>,
private config::IFetcherCallback<vespa::config::content::UpgradingConfig>,
private config::IFetcherCallback<vespa::config::content::core::BucketspacesConfig>,
+ private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
private framework::MetricUpdateHook,
private DoneInitializeHandler,
private framework::defaultimplementation::ShutdownListener
@@ -64,12 +63,8 @@ public:
StorageNode(const StorageNode &) = delete;
StorageNode & operator = (const StorageNode &) = delete;
- /**
- * @param excludeStorageChain With this option set, no chain will be set
- * up. This can be useful in unit testing if you need a storage server
- * instance, but you want to have full control over the components yourself.
- */
- StorageNode(const config::ConfigUri & configUri,
+
+ StorageNode(const config::ConfigUri& configUri,
StorageNodeContext& context,
ApplicationGenerationFetcher& generationFetcher,
std::unique_ptr<HostInfo> hostInfo,
@@ -97,10 +92,11 @@ public:
StorageLink* getChain() { return _chain.get(); }
virtual void initializeStatusWebServer();
protected:
- using StorServerConfig = vespa::config::content::core::StorServerConfig;
- using UpgradingConfig = vespa::config::content::UpgradingConfig;
- using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
- using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
+ using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig;
+ using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
+ using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+ using UpgradingConfig = vespa::config::content::UpgradingConfig;
private:
bool _singleThreadedDebugMode;
// Subscriptions to config
@@ -137,12 +133,14 @@ private:
virtual void configure(std::unique_ptr<document::config::DocumenttypesConfig> config,
bool hasChanged, int64_t generation);
void configure(std::unique_ptr<BucketspacesConfig>) override;
+ void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
protected:
// Lock taken while doing configuration of the server.
std::mutex _configLock;
std::mutex _initial_config_mutex;
using InitialGuard = std::lock_guard<std::mutex>;
+
// Current running config. Kept, such that we can see what has been
// changed in live config updates.
std::unique_ptr<StorServerConfig> _serverConfig;
@@ -150,12 +148,15 @@ protected:
std::unique_ptr<StorDistributionConfig> _distributionConfig;
std::unique_ptr<document::config::DocumenttypesConfig> _doctypesConfig;
std::unique_ptr<BucketspacesConfig> _bucketSpacesConfig;
+ std::unique_ptr<CommunicationManagerConfig> _comm_mgr_config;
+
// New configs gotten that has yet to have been handled
std::unique_ptr<StorServerConfig> _newServerConfig;
std::unique_ptr<UpgradingConfig> _newClusterConfig;
std::unique_ptr<StorDistributionConfig> _newDistributionConfig;
std::unique_ptr<document::config::DocumenttypesConfig> _newDoctypesConfig;
std::unique_ptr<BucketspacesConfig> _newBucketSpacesConfig;
+ std::unique_ptr<CommunicationManagerConfig> _new_comm_mgr_config;
std::unique_ptr<StorageComponent> _component;
std::unique_ptr<NodeIdentity> _node_identity;
config::ConfigUri _configUri;