summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-18 14:25:55 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-10-18 14:44:26 +0000
commitabcadea578098ebe22386f2872786fbbf2cc2b1f (patch)
tree0f168cd7dc229583e1874ac353cbad099dcb465a /storage
parentfeb393297b6033df6c9d1ff31439dd2e3b21ff45 (diff)
De-dupe `StorageNode` config propagation
Removes need to duplicate locking and explicit config propagation handling per config type. Also remove unused upgrade-config wiring.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp12
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp168
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h52
4 files changed, 109 insertions, 125 deletions
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index cbe1b64169b..31785e19681 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -88,7 +88,7 @@ DistributorNode::createChain(IStorageChainBuilder &builder)
if (_retrievedCommunicationManager) {
builder.add(std::move(_retrievedCommunicationManager));
} else {
- auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, *_comm_mgr_config);
+ auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, communication_manager_config());
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
}
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 3e990ba8cf3..0cd5ffeba68 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -107,7 +107,7 @@ ServiceLayerNode::initializeNodeSpecific()
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
- ns.setCapacity(_serverConfig->nodeCapacity);
+ ns.setCapacity(server_config().nodeCapacity);
LOG(debug, "Adjusting reported node state to include capacity: %s", ns.toString().c_str());
_component->getStateUpdater().setReportedNodeState(ns);
}
@@ -118,10 +118,10 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
- if (_newServerConfig) {
+ if (_server_config.staging) {
bool updated = false;
- vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
- StorServerConfig& newC(*_newServerConfig);
+ vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
+ StorServerConfig& newC(*_server_config.staging);
{
updated = false;
NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
@@ -133,7 +133,7 @@ ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
ns.setCapacity(newC.nodeCapacity);
}
if (updated) {
- _serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC));
+ _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
_component->getStateUpdater().setReportedNodeState(ns);
}
}
@@ -163,7 +163,7 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
{
ServiceLayerComponentRegister& compReg(_context.getComponentRegister());
- auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, *_comm_mgr_config);
+ auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, communication_manager_config());
_communicationManager = communication_manager.get();
builder.add(std::move(communication_manager));
builder.add(std::make_unique<Bouncer>(compReg, _configUri));
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 64d6dd423c6..f835b286165 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -90,14 +90,10 @@ StorageNode::StorageNode(
_chain(),
_configLock(),
_initial_config_mutex(),
- _serverConfig(),
- _clusterConfig(),
- _distributionConfig(),
- _bucketSpacesConfig(),
- _newServerConfig(),
- _newClusterConfig(),
- _newDistributionConfig(),
- _newBucketSpacesConfig(),
+ _bucket_spaces_config(),
+ _comm_mgr_config(),
+ _distribution_config(),
+ _server_config(),
_component(),
_node_identity(),
_configUri(configUri),
@@ -114,17 +110,15 @@ StorageNode::subscribeToConfigs()
_configFetcher->subscribe<CommunicationManagerConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this);
_configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this);
_configFetcher->start();
// All the below config instances were synchronously populated as part of start()ing the config fetcher
std::lock_guard configLockGuard(_configLock);
- _bucketSpacesConfig = std::move(_newBucketSpacesConfig);
- _clusterConfig = std::move(_newClusterConfig);
- _comm_mgr_config = std::move(_new_comm_mgr_config);
- _distributionConfig = std::move(_newDistributionConfig);
- _serverConfig = std::move(_newServerConfig);
+ _bucket_spaces_config.promote_staging_to_active();
+ _comm_mgr_config.promote_staging_to_active();
+ _distribution_config.promote_staging_to_active();
+ _server_config.promote_staging_to_active();
}
void
@@ -142,13 +136,13 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
// First update some basics that doesn't depend on anything else to be
// available
- _rootFolder = _serverConfig->rootFolder;
+ _rootFolder = server_config().rootFolder;
- _context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
+ _context.getComponentRegister().setNodeInfo(server_config().clusterName, getNodeType(), server_config().nodeIndex);
_context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory());
- _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig));
- _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
- _node_identity = std::make_unique<NodeIdentity>(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
+ _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(distribution_config()));
+ _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config());
+ _node_identity = std::make_unique<NodeIdentity>(server_config().clusterName, getNodeType(), server_config().nodeIndex);
_metrics = std::make_shared<StorageMetricSet>();
_component = std::make_unique<StorageComponent>(_context.getComponentRegister(), "storagenode");
@@ -185,17 +179,17 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
// Start deadlock detector
_deadLockDetector = std::make_unique<DeadLockDetector>(_context.getComponentRegister());
- _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
- _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
- _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
- _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
+ _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
+ _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
+ _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
createChain(*_chain_builder);
_chain = std::move(*_chain_builder).build();
_chain_builder.reset();
assert(_communicationManager != nullptr);
- _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
+ _communicationManager->updateBucketSpacesConfig(bucket_spaces_config());
perform_post_chain_creation_init_steps();
@@ -257,23 +251,23 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
// If we get here, initialize is done running. We have to handle changes
// we want to handle.
- if (_newServerConfig) {
- StorServerConfigBuilder oldC(*_serverConfig);
- StorServerConfig& newC(*_newServerConfig);
+ if (_server_config.staging) {
+ StorServerConfigBuilder oldC(*_server_config.active);
+ StorServerConfig& newC(*_server_config.staging);
DIFFERWARN(rootFolder, "Cannot alter root folder of node live");
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
- _serverConfig = std::make_unique<StorServerConfig>(oldC);
- _newServerConfig.reset();
- _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
- _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
- _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
- _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack));
- }
- if (_newDistributionConfig) {
- StorDistributionConfigBuilder oldC(*_distributionConfig);
- StorDistributionConfig& newC(*_newDistributionConfig);
+ _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO isn't this a no-op...?
+ _server_config.staging.reset();
+ _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
+ _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
+ _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ }
+ if (_distribution_config.staging) {
+ StorDistributionConfigBuilder oldC(*_distribution_config.active);
+ StorDistributionConfig& newC(*_distribution_config.staging);
bool updated = false;
if (DIFFER(redundancy)) {
LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy);
@@ -304,8 +298,9 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
LOG(info, "Live config update: Group structure altered.");
ASSIGN(group);
}
- _distributionConfig = std::make_unique<StorDistributionConfig>(oldC);
- _newDistributionConfig.reset();
+ // This looks weird, but the magical ASSIGN() macro mutates `oldC` in-place upon changes
+ _distribution_config.active = std::make_unique<StorDistributionConfig>(oldC);
+ _distribution_config.staging.reset();
if (updated) {
_context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC));
for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) {
@@ -313,21 +308,15 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
}
}
}
- if (_newClusterConfig) {
- if (*_clusterConfig != *_newClusterConfig) {
- LOG(warning, "Live config failure: Cannot alter cluster config of node live.");
- }
- _newClusterConfig.reset();
- }
- if (_newBucketSpacesConfig) {
- _bucketSpacesConfig = std::move(_newBucketSpacesConfig);
- _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig);
- _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
+ if (_bucket_spaces_config.staging) {
+ _bucket_spaces_config.promote_staging_to_active();
+ _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config());
+ _communicationManager->updateBucketSpacesConfig(bucket_spaces_config());
}
- if (_new_comm_mgr_config) {
- _comm_mgr_config = std::move(_new_comm_mgr_config);
- _communicationManager->on_configure(*_comm_mgr_config);
+ if (_comm_mgr_config.staging) {
+ _comm_mgr_config.promote_staging_to_active();
+ _communicationManager->on_configure(communication_manager_config());
}
}
@@ -438,68 +427,37 @@ StorageNode::shutdown()
void
StorageNode::configure(std::unique_ptr<StorServerConfig> config) {
- log_config_received(*config);
- // When we get config, we try to grab the config lock to ensure noone
- // else is doing configuration work, and then we write the new config
- // to a variable where we can find it later when processing config
- // updates
- {
- std::lock_guard configLockGuard(_configLock);
- _newServerConfig = std::move(config);
- }
- if (_serverConfig) {
- InitialGuard concurrent_config_guard(_initial_config_mutex);
- handleLiveConfigUpdate(concurrent_config_guard);
- }
-}
-
-void
-StorageNode::configure(std::unique_ptr<UpgradingConfig> config) {
- log_config_received(*config);
- {
- std::lock_guard configLockGuard(_configLock);
- _newClusterConfig = std::move(config);
- }
- if (_clusterConfig) {
- InitialGuard concurrent_config_guard(_initial_config_mutex);
- handleLiveConfigUpdate(concurrent_config_guard);
- }
+ stage_config_change(_server_config, std::move(config));
}
void
StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) {
- log_config_received(*config);
- {
- std::lock_guard configLockGuard(_configLock);
- _newDistributionConfig = std::move(config);
- }
- if (_distributionConfig) {
- InitialGuard concurrent_config_guard(_initial_config_mutex);
- handleLiveConfigUpdate(concurrent_config_guard);
- }
+ stage_config_change(_distribution_config, std::move(config));
}
void
StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) {
- log_config_received(*config);
- {
- std::lock_guard configLockGuard(_configLock);
- _newBucketSpacesConfig = std::move(config);
- }
- if (_bucketSpacesConfig) {
- InitialGuard concurrent_config_guard(_initial_config_mutex);
- handleLiveConfigUpdate(concurrent_config_guard);
- }
+ stage_config_change(_bucket_spaces_config, std::move(config));
}
void
StorageNode::configure(std::unique_ptr<CommunicationManagerConfig > config) {
- log_config_received(*config);
+ stage_config_change(_comm_mgr_config, std::move(config));
+}
+
+template <typename ConfigT>
+void
+StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<ConfigT> new_cfg) {
+ log_config_received(*new_cfg);
+ // When we get config, we try to grab the config lock to ensure no one
+ // else is doing configuration work, and then we write the new config
+ // to a variable where we can find it later when processing config
+ // updates
{
std::lock_guard config_lock_guard(_configLock);
- _new_comm_mgr_config = std::move(config);
+ cfg.staging = std::move(new_cfg);
}
- if (_comm_mgr_config) {
+ if (cfg.active) {
InitialGuard concurrent_config_guard(_initial_config_mutex);
handleLiveConfigUpdate(concurrent_config_guard);
}
@@ -565,4 +523,16 @@ StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> bui
_chain_builder = std::move(builder);
}
+template <typename ConfigT>
+StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() = default;
+
+template <typename ConfigT>
+StorageNode::ConfigWrapper<ConfigT>::~ConfigWrapper() = default;
+
+template <typename ConfigT>
+void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() {
+ assert(staging);
+ active = std::move(staging);
+}
+
} // storage
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 49dd7c96fc9..fb616eb2475 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -11,7 +11,6 @@
#include <vespa/config-bucketspaces.h>
#include <vespa/config-stor-distribution.h>
-#include <vespa/config-upgrading.h>
#include <vespa/config/helper/ifetchercallback.h>
#include <vespa/config/subscription/configuri.h>
#include <vespa/document/config/config-documenttypes.h>
@@ -51,7 +50,6 @@ namespace lib { class NodeType; }
class StorageNode : private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>,
private config::IFetcherCallback<vespa::config::content::StorDistributionConfig>,
- private config::IFetcherCallback<vespa::config::content::UpgradingConfig>,
private config::IFetcherCallback<vespa::config::content::core::BucketspacesConfig>,
private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>,
private framework::MetricUpdateHook,
@@ -96,10 +94,9 @@ protected:
using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
using StorServerConfig = vespa::config::content::core::StorServerConfig;
- using UpgradingConfig = vespa::config::content::UpgradingConfig;
private:
bool _singleThreadedDebugMode;
- // Subscriptions to config
+ // Subscriptions to config
std::unique_ptr<config::ConfigFetcher> _configFetcher;
std::unique_ptr<HostInfo> _hostInfo;
@@ -126,9 +123,22 @@ private:
// The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
+ template <typename ConfigT>
+ struct ConfigWrapper {
+ std::unique_ptr<ConfigT> staging;
+ std::unique_ptr<ConfigT> active;
+
+ ConfigWrapper();
+ ~ConfigWrapper();
+
+ void promote_staging_to_active();
+ };
+
+ template <typename ConfigT>
+ void stage_config_change(ConfigWrapper<ConfigT>& my_cfg, std::unique_ptr<ConfigT> new_cfg);
+
/** Implementation of config callbacks. */
void configure(std::unique_ptr<StorServerConfig> config) override;
- void configure(std::unique_ptr<UpgradingConfig> config) override;
void configure(std::unique_ptr<StorDistributionConfig> config) override;
void configure(std::unique_ptr<BucketspacesConfig>) override;
void configure(std::unique_ptr<CommunicationManagerConfig> config) override;
@@ -139,20 +149,24 @@ protected:
std::mutex _initial_config_mutex;
using InitialGuard = std::lock_guard<std::mutex>;
- // Current running config. Kept, such that we can see what has been
- // changed in live config updates.
- std::unique_ptr<StorServerConfig> _serverConfig;
- std::unique_ptr<UpgradingConfig> _clusterConfig;
- std::unique_ptr<StorDistributionConfig> _distributionConfig;
- std::unique_ptr<BucketspacesConfig> _bucketSpacesConfig;
- std::unique_ptr<CommunicationManagerConfig> _comm_mgr_config;
-
- // New configs gotten that has yet to have been handled
- std::unique_ptr<StorServerConfig> _newServerConfig;
- std::unique_ptr<UpgradingConfig> _newClusterConfig;
- std::unique_ptr<StorDistributionConfig> _newDistributionConfig;
- std::unique_ptr<BucketspacesConfig> _newBucketSpacesConfig;
- std::unique_ptr<CommunicationManagerConfig> _new_comm_mgr_config;
+ ConfigWrapper<BucketspacesConfig> _bucket_spaces_config;
+ ConfigWrapper<CommunicationManagerConfig> _comm_mgr_config;
+ ConfigWrapper<StorDistributionConfig> _distribution_config;
+ ConfigWrapper<StorServerConfig> _server_config;
+
+ [[nodiscard]] const BucketspacesConfig& bucket_spaces_config() const noexcept {
+ return *_bucket_spaces_config.active;
+ }
+ [[nodiscard]] const CommunicationManagerConfig& communication_manager_config() const noexcept {
+ return *_comm_mgr_config.active;
+ }
+ [[nodiscard]] const StorDistributionConfig& distribution_config() const noexcept {
+ return *_distribution_config.active;
+ }
+ [[nodiscard]] const StorServerConfig& server_config() const noexcept {
+ return *_server_config.active;
+ }
+
std::unique_ptr<StorageComponent> _component;
std::unique_ptr<NodeIdentity> _node_identity;
config::ConfigUri _configUri;