diff options
Diffstat (limited to 'storage/src/vespa/storage/storageserver/storagenode.cpp')
-rw-r--r-- | storage/src/vespa/storage/storageserver/storagenode.cpp | 226 |
1 files changed, 94 insertions, 132 deletions
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 99a879e19db..f7a426a0527 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" #include "config_logging.h" @@ -66,14 +66,19 @@ removePidFile(const vespalib::string& pidfile) } // End of anonymous namespace +StorageNode::BootstrapConfigs::BootstrapConfigs() = default; +StorageNode::BootstrapConfigs::~BootstrapConfigs() = default; +StorageNode::BootstrapConfigs::BootstrapConfigs(BootstrapConfigs&&) noexcept = default; +StorageNode::BootstrapConfigs& StorageNode::BootstrapConfigs::operator=(BootstrapConfigs&&) noexcept = default; + StorageNode::StorageNode( const config::ConfigUri & configUri, StorageNodeContext& context, + BootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, std::unique_ptr<HostInfo> hostInfo, RunMode mode) : _singleThreadedDebugMode(mode == SINGLE_THREADED_TEST_MODE), - _configFetcher(), _hostInfo(std::move(hostInfo)), _context(context), _generationFetcher(generationFetcher), @@ -90,16 +95,11 @@ StorageNode::StorageNode( _chain(), _configLock(), _initial_config_mutex(), - _serverConfig(), - _clusterConfig(), - _distributionConfig(), - _doctypesConfig(), - _bucketSpacesConfig(), - _newServerConfig(), - _newClusterConfig(), - _newDistributionConfig(), - _newDoctypesConfig(), - _newBucketSpacesConfig(), + _bouncer_config(std::move(bootstrap_configs.bouncer_cfg)), + _bucket_spaces_config(std::move(bootstrap_configs.bucket_spaces_cfg)), + _comm_mgr_config(std::move(bootstrap_configs.comm_mgr_cfg)), + _distribution_config(std::move(bootstrap_configs.distribution_cfg)), + _server_config(std::move(bootstrap_configs.server_cfg)), _component(), _node_identity(), _configUri(configUri), @@ -109,45 +109,24 @@ StorageNode::StorageNode( } void -StorageNode::subscribeToConfigs() -{ - _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); - _configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this); - - _configFetcher->start(); - - std::lock_guard configLockGuard(_configLock); - _serverConfig = std::move(_newServerConfig); - _clusterConfig = std::move(_newClusterConfig); - _distributionConfig = std::move(_newDistributionConfig); - _bucketSpacesConfig = std::move(_newBucketSpacesConfig); -} - -void StorageNode::initialize(const NodeStateReporter & nodeStateReporter) { // Avoid racing with concurrent reconfigurations before we've set up the entire // node component stack. + // TODO no longer needed... probably std::lock_guard<std::mutex> concurrent_config_guard(_initial_config_mutex); _context.getComponentRegister().registerShutdownListener(*this); - // Fetch configs needed first. These functions will just grab the config - // and store them away, while having the config lock. - subscribeToConfigs(); - // First update some basics that doesn't depend on anything else to be // available - _rootFolder = _serverConfig->rootFolder; + _rootFolder = server_config().rootFolder; - _context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); + _context.getComponentRegister().setNodeInfo(server_config().clusterName, getNodeType(), server_config().nodeIndex); _context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory()); - _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig)); - _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); - _node_identity = std::make_unique<NodeIdentity>(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); + _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(distribution_config())); + _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config()); + _node_identity = std::make_unique<NodeIdentity>(server_config().clusterName, getNodeType(), server_config().nodeIndex); _metrics = std::make_shared<StorageMetricSet>(); _component = std::make_unique<StorageComponent>(_context.getComponentRegister(), "storagenode"); @@ -184,17 +163,17 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) // Start deadlock detector _deadLockDetector = std::make_unique<DeadLockDetector>(_context.getComponentRegister()); - _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); - _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); - _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); + _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); + _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); + _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); createChain(*_chain_builder); _chain = std::move(*_chain_builder).build(); _chain_builder.reset(); assert(_communicationManager != nullptr); - _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); + _communicationManager->updateBucketSpacesConfig(bucket_spaces_config()); perform_post_chain_creation_init_steps(); @@ -256,23 +235,23 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) // If we get here, initialize is done running. We have to handle changes // we want to handle. - if (_newServerConfig) { - StorServerConfigBuilder oldC(*_serverConfig); - StorServerConfig& newC(*_newServerConfig); + if (_server_config.staging) { + StorServerConfigBuilder oldC(*_server_config.active); + StorServerConfig& newC(*_server_config.staging); DIFFERWARN(rootFolder, "Cannot alter root folder of node live"); DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); - _serverConfig = std::make_unique<StorServerConfig>(oldC); - _newServerConfig.reset(); - _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); - _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); - _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - } - if (_newDistributionConfig) { - StorDistributionConfigBuilder oldC(*_distributionConfig); - StorDistributionConfig& newC(*_newDistributionConfig); + _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode + _server_config.staging.reset(); + _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); + _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); + _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + } + if (_distribution_config.staging) { + StorDistributionConfigBuilder oldC(*_distribution_config.active); + StorDistributionConfig& newC(*_distribution_config.staging); bool updated = false; if (DIFFER(redundancy)) { LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy); @@ -303,8 +282,9 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) LOG(info, "Live config update: Group structure altered."); ASSIGN(group); } - _distributionConfig = std::make_unique<StorDistributionConfig>(oldC); - _newDistributionConfig.reset(); + // This looks weird, but the magical ASSIGN() macro mutates `oldC` in-place upon changes + _distribution_config.active = std::make_unique<StorDistributionConfig>(oldC); + _distribution_config.staging.reset(); if (updated) { _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC)); for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) { @@ -312,17 +292,19 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) } } } - if (_newClusterConfig) { - if (*_clusterConfig != *_newClusterConfig) { - LOG(warning, "Live config failure: Cannot alter cluster config of node live."); - } - _newClusterConfig.reset(); - } - if (_newBucketSpacesConfig) { - _bucketSpacesConfig = std::move(_newBucketSpacesConfig); - _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); - _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); + if (_bucket_spaces_config.staging) { + _bucket_spaces_config.promote_staging_to_active(); + _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config()); + _communicationManager->updateBucketSpacesConfig(bucket_spaces_config()); + } + if (_comm_mgr_config.staging) { + _comm_mgr_config.promote_staging_to_active(); + _communicationManager->on_configure(communication_manager_config()); + } + if (_bouncer_config.staging) { + _bouncer_config.promote_staging_to_active(); + on_bouncer_config_changed(); } } @@ -347,25 +329,16 @@ StorageNode::notifyDoneInitializing() StorageNode::~StorageNode() = default; void -StorageNode::removeConfigSubscriptions() -{ - LOG(debug, "Removing config subscribers"); - _configFetcher.reset(); -} - -void StorageNode::shutdown() { // Try to shut down in opposite order of initialize. Bear in mind that // we might be shutting down after init exception causing only parts - // of the server to have initialize + // of the server to have been initialized LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!attemptedStopped()) { LOG(debug, "Storage killed before requestShutdown() was called. No " "reason has been given for why we're stopping."); } - // Remove the subscription to avoid more callbacks from config - removeConfigSubscriptions(); if (_chain) { LOG(debug, "Closing storage chain"); @@ -433,72 +406,42 @@ StorageNode::shutdown() void StorageNode::configure(std::unique_ptr<StorServerConfig> config) { - log_config_received(*config); - // When we get config, we try to grab the config lock to ensure noone - // else is doing configuration work, and then we write the new config - // to a variable where we can find it later when processing config - // updates - { - std::lock_guard configLockGuard(_configLock); - _newServerConfig = std::move(config); - } - if (_serverConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } + stage_config_change(_server_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<UpgradingConfig> config) { - log_config_received(*config); - { - std::lock_guard configLockGuard(_configLock); - _newClusterConfig = std::move(config); - } - if (_clusterConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { + stage_config_change(_distribution_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { - log_config_received(*config); - { - std::lock_guard configLockGuard(_configLock); - _newDistributionConfig = std::move(config); - } - if (_distributionConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { + stage_config_change(_bucket_spaces_config, std::move(config)); } + void -StorageNode::configure(std::unique_ptr<document::config::DocumenttypesConfig> config, - bool hasChanged, int64_t generation) -{ - log_config_received(*config); - (void) generation; - if (!hasChanged) - return; - { - std::lock_guard configLockGuard(_configLock); - _newDoctypesConfig = std::move(config); - } - if (_doctypesConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<CommunicationManagerConfig> config) { + stage_config_change(_comm_mgr_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { - log_config_received(*config); +StorageNode::configure(std::unique_ptr<StorBouncerConfig> config) { + stage_config_change(_bouncer_config, std::move(config)); +} + +template <typename ConfigT> +void +StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<ConfigT> new_cfg) { + log_config_received(*new_cfg); + // When we get config, we try to grab the config lock to ensure no one + // else is doing configuration work, and then we write the new config + // to a variable where we can find it later when processing config + // updates { - std::lock_guard configLockGuard(_configLock); - _newBucketSpacesConfig = std::move(config); + std::lock_guard config_lock_guard(_configLock); + cfg.staging = std::move(new_cfg); } - if (_bucketSpacesConfig) { + if (cfg.active) { InitialGuard concurrent_config_guard(_initial_config_mutex); handleLiveConfigUpdate(concurrent_config_guard); } @@ -564,4 +507,23 @@ StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> bui _chain_builder = std::move(builder); } +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() noexcept = default; + +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper(std::unique_ptr<ConfigT> initial_active) noexcept + : staging(), + active(std::move(initial_active)) +{ +} + +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::~ConfigWrapper() = default; + +template <typename ConfigT> +void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() noexcept { + assert(staging); + active = std::move(staging); +} + } // storage |