diff options
Diffstat (limited to 'storage')
4 files changed, 155 insertions, 168 deletions
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 79d57a8f7e4..e2861ef42cd 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -17,13 +17,15 @@ #include <vespa/storage/bucketmover/bucketmover.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> -#include <vespa/storage/persistence/provider_error_wrapper.h> #include <vespa/persistence/spi/exceptions.h> #include <vespa/messagebus/rpcmessagebus.h> #include <vespa/log/log.h> LOG_SETUP(".node.servicelayer"); + +using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder; + namespace storage { ServiceLayerNode::ServiceLayerNode( @@ -53,8 +55,7 @@ void ServiceLayerNode::init() throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC); } - spi::PartitionStateListResult result( - _persistenceProvider.getPartitionStates()); + spi::PartitionStateListResult result(_persistenceProvider.getPartitionStates()); if (result.hasError()) { LOG(error, "Failed to get partition list from persistence provider: %s", result.toString().c_str()); throw spi::HandledException("Failed to get partition list: " + result.toString(), VESPA_STRLOC); @@ -62,8 +63,7 @@ void ServiceLayerNode::init() _partitions = result.getList(); if (_partitions.size() == 0) { LOG(error, "No partitions in persistence provider. See documentation " - "for your persistence provider as to how to set up " - "partitions in it."); + "for your persistence provider as to how to set up partitions in it."); throw spi::HandledException("No partitions in provider", VESPA_STRLOC); } try{ @@ -93,7 +93,7 @@ ServiceLayerNode::subscribeToConfigs() { StorageNode::subscribeToConfigs(); _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext())); - _configFetcher->subscribe<vespa::config::storage::StorDevicesConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<StorDevicesConfig>(_configUri.getConfigId(), this); vespalib::LockGuard configLockGuard(_configLock); _deviceConfig = std::move(_newDevicesConfig); @@ -123,8 +123,7 @@ ServiceLayerNode::initializeNodeSpecific() // Give node state to mount point initialization, such that we can // get disk count and state of unavailable disks set in reported // node state. - NodeStateUpdater::Lock::SP lock( - _component->getStateUpdater().grabStateChangeLock()); + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); ns.setDiskCount(_partitions.size()); @@ -166,12 +165,12 @@ ServiceLayerNode::initializeNodeSpecific() if (DIFFER(a)) { LOG(warning, "Live config failure: %s.", b); } void -ServiceLayerNode::handleLiveConfigUpdate() +ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard) { if (_newServerConfig) { bool updated = false; vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig); - vespa::config::content::core::StorServerConfig& newC(*_newServerConfig); + StorServerConfig& newC(*_newServerConfig); DIFFERWARN(diskCount, "Cannot alter partition count of node live"); { updated = false; @@ -207,12 +206,11 @@ ServiceLayerNode::handleLiveConfigUpdate() } } } - StorageNode::handleLiveConfigUpdate(); + StorageNode::handleLiveConfigUpdate(initGuard); } void -ServiceLayerNode::configure( - std::unique_ptr<vespa::config::storage::StorDevicesConfig> config) +ServiceLayerNode::configure(std::unique_ptr<StorDevicesConfig> config) { // When we get config, we try to grab the config lock to ensure noone // else is doing configuration work, and then we write the new config @@ -223,20 +221,19 @@ ServiceLayerNode::configure( _newDevicesConfig = std::move(config); } if (_distributionConfig) { - handleLiveConfigUpdate(); + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); } } VisitorMessageSession::UP ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread) { - MessageBusVisitorMessageSession::UP mbusSession( - new MessageBusVisitorMessageSession(visitor, thread)); + auto mbusSession = std::make_unique<MessageBusVisitorMessageSession>(visitor, thread); mbus::SourceSessionParams srcParams; srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); srcParams.setReplyHandler(*mbusSession); - mbusSession->setSourceSession( - _communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams)); + mbusSession->setSourceSession(_communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams)); return VisitorMessageSession::UP(std::move(mbusSession)); } @@ -270,17 +267,13 @@ ServiceLayerNode::createChain() chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg))); chain->push_back(StorageLink::UP(new StorageBucketDBInitializer( _configUri, _partitions, getDoneInitializeHandler(), compReg))); - chain->push_back(StorageLink::UP(new BucketManager( - _configUri, _context.getComponentRegister()))); + chain->push_back(StorageLink::UP(new BucketManager(_configUri, _context.getComponentRegister()))); chain->push_back(StorageLink::UP(new VisitorManager( - _configUri, _context.getComponentRegister(), - *this, _externalVisitors))); + _configUri, _context.getComponentRegister(), *this, _externalVisitors))); chain->push_back(StorageLink::UP(new ModifiedBucketChecker( - _context.getComponentRegister(), _persistenceProvider, - _configUri))); + _context.getComponentRegister(), _persistenceProvider, _configUri))); chain->push_back(StorageLink::UP(_fileStorManager = new FileStorManager( - _configUri, _partitions, _persistenceProvider, - _context.getComponentRegister()))); + _configUri, _partitions, _persistenceProvider, _context.getComponentRegister()))); chain->push_back(StorageLink::UP(releaseStateManager().release())); // Lifetimes of all referenced components shall outlive the last call going diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index 848cc1d1475..12446152b90 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -28,6 +28,7 @@ class ServiceLayerNode private config::IFetcherCallback<vespa::config::storage::StorDevicesConfig> { + using StorDevicesConfig = vespa::config::storage::StorDevicesConfig; ServiceLayerNodeContext& _context; spi::PersistenceProvider& _persistenceProvider; spi::PartitionStateList _partitions; @@ -36,8 +37,8 @@ class ServiceLayerNode // FIXME: Should probably use the fetcher in StorageNode std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<vespa::config::storage::StorDevicesConfig> _deviceConfig; - std::unique_ptr<vespa::config::storage::StorDevicesConfig> _newDevicesConfig; + std::unique_ptr<StorDevicesConfig> _deviceConfig; + std::unique_ptr<StorDevicesConfig> _newDevicesConfig; FileStorManager* _fileStorManager; bool _init_has_been_called; bool _noUsablePartitionMode; @@ -63,8 +64,8 @@ public: private: void subscribeToConfigs() override; void initializeNodeSpecific() override; - void handleLiveConfigUpdate() override; - void configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config) override; + void handleLiveConfigUpdate(const InitialGuard & initGuard) override; + void configure(std::unique_ptr<StorDevicesConfig> config) override; VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override; documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override; std::unique_ptr<StorageLink> createChain() override; diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 85f54431cfb..855efaed6aa 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -19,6 +19,10 @@ #include <vespa/log/log.h> LOG_SETUP(".node.server"); +using vespa::config::content::StorDistributionConfigBuilder; +using vespa::config::content::core::StorServerConfigBuilder; +using std::make_shared; + namespace storage { namespace { @@ -86,10 +90,11 @@ void StorageNode::subscribeToConfigs() { _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext())); - _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<vespa::config::content::UpgradingConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<vespa::config::content::core::StorServerConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<vespa::config::content::core::StorPrioritymappingConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this); + _configFetcher->subscribe<StorPrioritymappingConfig>(_configUri.getConfigId(), this); + _configFetcher->start(); vespalib::LockGuard configLockGuard(_configLock); @@ -99,10 +104,13 @@ StorageNode::subscribeToConfigs() _priorityConfig = std::move(_newPriorityConfig); } - void StorageNode::initialize() { + // Avoid racing with concurrent reconfigurations before we've set up the entire + // node component stack. + std::lock_guard<std::mutex> concurrent_config_guard(_initial_config_mutex); + _context.getComponentRegister().registerShutdownListener(*this); // Fetch configs needed first. These functions will just grab the config @@ -116,21 +124,14 @@ StorageNode::initialize() // available _rootFolder = _serverConfig->rootFolder; - _context.getComponentRegister().setNodeInfo( - _serverConfig->clusterName, getNodeType(), - _serverConfig->nodeIndex); - _context.getComponentRegister().setLoadTypes( - documentapi::LoadTypeSet::SP( - new documentapi::LoadTypeSet(_configUri))); - _context.getComponentRegister().setBucketIdFactory( - document::BucketIdFactory()); - _context.getComponentRegister().setDistribution( - lib::Distribution::SP(new lib::Distribution(*_distributionConfig))); + _context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); + _context.getComponentRegister().setLoadTypes(make_shared<documentapi::LoadTypeSet>(_configUri)); + _context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory()); + _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig)); _context.getComponentRegister().setPriorityConfig(*_priorityConfig); _metrics.reset(new StorageMetricSet); - _component.reset(new StorageComponent( - _context.getComponentRegister(), "storagenode")); + _component.reset(new StorageComponent(_context.getComponentRegister(), "storagenode")); _component->registerMetric(*_metrics); if (!_context.getComponentRegister().hasMetricManager()) { _metricManager.reset(new metrics::MetricManager); @@ -168,17 +169,13 @@ StorageNode::initialize() _generationFetcher)); // Start deadlock detector - _deadLockDetector.reset(new DeadLockDetector( - _context.getComponentRegister())); - _deadLockDetector->enableWarning( - _serverConfig->enableDeadLockDetectorWarnings); + _deadLockDetector.reset(new DeadLockDetector(_context.getComponentRegister())); + _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); _deadLockDetector->setProcessSlack(framework::MilliSecTime( - static_cast<uint32_t>( - _serverConfig->deadLockDetectorTimeoutSlack * 1000))); + static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000))); _deadLockDetector->setWaitSlack(framework::MilliSecTime( - static_cast<uint32_t>( - _serverConfig->deadLockDetectorTimeoutSlack * 1000))); + static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000))); _chain.reset(createChain().release()); @@ -188,7 +185,7 @@ StorageNode::initialize() // reinitializing metric manager often. _context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool()); - if (_chain.get() != 0) { + if (_chain) { LOG(debug, "Storage chain configured. Calling open()"); _chain->open(); } @@ -208,9 +205,8 @@ void StorageNode::initializeStatusWebServer() { if (_singleThreadedDebugMode) return; - _statusWebServer.reset(new StatusWebServer( - _context.getComponentRegister(), _context.getComponentRegister(), - _configUri)); + _statusWebServer.reset(new StatusWebServer(_context.getComponentRegister(), + _context.getComponentRegister(), _configUri)); } #define DIFFER(a) (!(oldC.a == newC.a)) @@ -223,13 +219,13 @@ StorageNode::setNewDocumentRepo(const document::DocumentTypeRepo::SP& repo) { vespalib::LockGuard configLockGuard(_configLock); _context.getComponentRegister().setDocumentTypeRepo(repo); - if (_communicationManager != 0) { + if (_communicationManager != nullptr) { _communicationManager->updateMessagebusProtocol(repo); } } void -StorageNode::updateUpgradeFlag(const vespa::config::content::UpgradingConfig& config) +StorageNode::updateUpgradeFlag(const UpgradingConfig& config) { framework::UpgradeFlags flag(framework::NO_UPGRADE_SPECIAL_HANDLING_ACTIVE); if (config.upgradingMajorTo) { @@ -245,69 +241,62 @@ StorageNode::updateUpgradeFlag(const vespa::config::content::UpgradingConfig& co } void -StorageNode::handleLiveConfigUpdate() +StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) { - // Make sure we don't conflict with initialize or shutdown threads. + // Make sure we don't conflict with initialize or shutdown threads. + (void) initGuard; vespalib::LockGuard configLockGuard(_configLock); - // If storage haven't initialized, ignore. Initialize code will handle - // this config. - if (_chain.get() == 0) return; + + assert(_chain); // If we get here, initialize is done running. We have to handle changes // we want to handle. - if (_newServerConfig.get() != 0) { + if (_newServerConfig) { bool updated = false; - vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig); - vespa::config::content::core::StorServerConfig& newC(*_newServerConfig); + StorServerConfigBuilder oldC(*_serverConfig); + StorServerConfig& newC(*_newServerConfig); DIFFERWARN(rootFolder, "Cannot alter root folder of node live"); DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); { if (DIFFER(memorytouse)) { - LOG(info, "Live config update: Memory to use changed " - "from %" PRId64 " to %" PRId64 ".", + LOG(info, "Live config update: Memory to use changed from %" PRId64 " to %" PRId64 ".", oldC.memorytouse, newC.memorytouse); ASSIGN(memorytouse); - _context.getMemoryManager().setMaximumMemoryUsage( - newC.memorytouse); + _context.getMemoryManager().setMaximumMemoryUsage(newC.memorytouse); } } - _serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC)); - _newServerConfig.reset(0); + _serverConfig.reset(new StorServerConfig(oldC)); + _newServerConfig.reset(); (void)updated; } - if (_newDistributionConfig.get() != 0) { - vespa::config::content::StorDistributionConfigBuilder oldC(*_distributionConfig); - vespa::config::content::StorDistributionConfig& newC(*_newDistributionConfig); + if (_newDistributionConfig) { + StorDistributionConfigBuilder oldC(*_distributionConfig); + StorDistributionConfig& newC(*_newDistributionConfig); bool updated = false; if (DIFFER(redundancy)) { - LOG(info, "Live config update: Altering redundancy from %u to %u.", - oldC.redundancy, newC.redundancy); + LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy); ASSIGN(redundancy); } if (DIFFER(initialRedundancy)) { - LOG(info, "Live config update: Altering initial redundancy " - "from %u to %u.", + LOG(info, "Live config update: Altering initial redundancy from %u to %u.", oldC.initialRedundancy, newC.initialRedundancy); ASSIGN(initialRedundancy); } if (DIFFER(ensurePrimaryPersisted)) { - LOG(info, "Live config update: Now%s requiring primary copy to " - "succeed for n of m operation to succeed.", + LOG(info, "Live config update: Now%s requiring primary copy to succeed for n of m operation to succeed.", newC.ensurePrimaryPersisted ? "" : " not"); ASSIGN(ensurePrimaryPersisted); } if (DIFFER(activePerLeafGroup)) { - LOG(info, "Live config update: Active per leaf group setting " - "altered from %s to %s", + LOG(info, "Live config update: Active per leaf group setting altered from %s to %s", oldC.activePerLeafGroup ? "true" : "false", newC.activePerLeafGroup ? "true" : "false"); ASSIGN(activePerLeafGroup); } if (DIFFER(readyCopies)) { - LOG(info, "Live config update: Altering number of searchable " - "copies from %u to %u", + LOG(info, "Live config update: Altering number of searchable copies from %u to %u", oldC.readyCopies, newC.readyCopies); ASSIGN(readyCopies); } @@ -316,35 +305,28 @@ StorageNode::handleLiveConfigUpdate() ASSIGN(group); } if (DIFFER(diskDistribution)) { - LOG(info, "Live config update: Disk distribution altered from " - "%s to %s.", - vespa::config::content::StorDistributionConfig::getDiskDistributionName( - oldC.diskDistribution).c_str(), - vespa::config::content::StorDistributionConfig::getDiskDistributionName( - newC.diskDistribution).c_str()); + LOG(info, "Live config update: Disk distribution altered from %s to %s.", + StorDistributionConfig::getDiskDistributionName(oldC.diskDistribution).c_str(), + StorDistributionConfig::getDiskDistributionName(newC.diskDistribution).c_str()); ASSIGN(diskDistribution); } - _distributionConfig.reset(new vespa::config::content::StorDistributionConfig(oldC)); - _newDistributionConfig.reset(0); + _distributionConfig.reset(new StorDistributionConfig(oldC)); + _newDistributionConfig.reset(); if (updated) { - _context.getComponentRegister().setDistribution( - lib::Distribution::SP(new lib::Distribution(oldC))); - for (StorageLink* link = _chain.get(); link != 0; - link = link->getNextLink()) - { + _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC)); + for (StorageLink* link = _chain.get(); link != 0; link = link->getNextLink()) { link->storageDistributionChanged(); } } } - if (_newClusterConfig.get() != 0) { + if (_newClusterConfig) { updateUpgradeFlag(*_newClusterConfig); if (*_clusterConfig != *_newClusterConfig) { - LOG(warning, "Live config failure: " - "Cannot alter cluster config of node live."); + LOG(warning, "Live config failure: Cannot alter cluster config of node live."); } - _newClusterConfig.reset(0); + _newClusterConfig.reset(); } - if (_newPriorityConfig.get() != 0) { + if (_newPriorityConfig) { _priorityConfig = std::move(_newPriorityConfig); _context.getComponentRegister().setPriorityConfig(*_priorityConfig); } @@ -354,31 +336,27 @@ void StorageNode::notifyDoneInitializing() { bool isDistributor = (getNodeType() == lib::NodeType::DISTRIBUTOR); - LOG(info, "%s node ready. Done initializing. Giving out of sequence " - "metric event. Config id is %s", + LOG(info, "%s node ready. Done initializing. Giving out of sequence metric event. Config id is %s", isDistributor ? "Distributor" : "Storage", _configUri.getConfigId().c_str()); _context.getComponentRegister().getMetricManager().forceEventLogging(); if (!_singleThreadedDebugMode) { EV_STARTED(isDistributor ? "distributor" : "storagenode"); } - NodeStateUpdater::Lock::SP lock( - _component->getStateUpdater().grabStateChangeLock()); + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); ns.setState(lib::State::UP); _component->getStateUpdater().setReportedNodeState(ns); _chain->doneInit(); } -StorageNode::~StorageNode() -{ -} +StorageNode::~StorageNode() = default; void StorageNode::removeConfigSubscriptions() { LOG(debug, "Removing config subscribers"); - _configFetcher.reset(0); + _configFetcher.reset(); } void @@ -387,8 +365,7 @@ StorageNode::shutdown() // Try to shut down in opposite order of initialize. Bear in mind that // we might be shutting down after init exception causing only parts // of the server to have initialize - LOG(debug, "Shutting down storage node of type %s", - getNodeType().toString().c_str()); + LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!_attemptedStopped) { LOG(warning, "Storage killed before requestShutdown() was called. No " "reason has been given for why we're stopping."); @@ -396,7 +373,7 @@ StorageNode::shutdown() // Remove the subscription to avoid more callbacks from config removeConfigSubscriptions(); - if (_chain.get()) { + if (_chain) { LOG(debug, "Closing storage chain"); _chain->close(); LOG(debug, "Flushing storage chain"); @@ -409,56 +386,54 @@ StorageNode::shutdown() } if (!_singleThreadedDebugMode) { - EV_STOPPING(getNodeType() == lib::NodeType::DISTRIBUTOR - ? "distributor" : "storagenode", "Stopped"); + EV_STOPPING(getNodeType() == lib::NodeType::DISTRIBUTOR ? "distributor" : "storagenode", "Stopped"); } if (_context.getComponentRegister().hasMetricManager()) { - LOG(debug, "Stopping metric manager. " - "(Deleting chain may remove metrics)"); + LOG(debug, "Stopping metric manager. (Deleting chain may remove metrics)"); _context.getComponentRegister().getMetricManager().stop(); } // Delete the status web server before the actual status providers, to // ensure that web server does not query providers during shutdown - _statusWebServer.reset(0); + _statusWebServer.reset(); // For this to be safe, noone can touch the state updater after we start // deleting the storage chain LOG(debug, "Removing state updater pointer as we're about to delete it."); - if (_chain.get()) { + if (_chain) { LOG(debug, "Deleting storage chain"); - _chain.reset(0); + _chain.reset(); } - if (_statusMetrics.get()) { + if (_statusMetrics) { LOG(debug, "Deleting status metrics consumer"); - _statusMetrics.reset(0); + _statusMetrics.reset(); } - if (_stateReporter.get()) { + if (_stateReporter) { LOG(debug, "Deleting state reporter"); - _stateReporter.reset(0); + _stateReporter.reset(); } - if (_memoryStatusViewer.get()) { + if (_memoryStatusViewer) { LOG(debug, "Deleting memory status viewer"); - _memoryStatusViewer.reset(0); + _memoryStatusViewer.reset(); } - if (_stateManager.get()) { + if (_stateManager) { LOG(debug, "Deleting state manager"); - _stateManager.reset(0); + _stateManager.reset(); } - if (_deadLockDetector.get()) { + if (_deadLockDetector) { LOG(debug, "Deleting dead lock detector"); - _deadLockDetector.reset(0); + _deadLockDetector.reset(); } - if (_metricManager.get()) { + if (_metricManager) { LOG(debug, "Deleting metric manager"); - _metricManager.reset(0); + _metricManager.reset(); } - if (_metrics.get()) { + if (_metrics) { LOG(debug, "Deleting metric set"); _metrics.reset(); } - if (_component.get()) { + if (_component) { LOG(debug, "Deleting component"); _component.reset(); } @@ -466,7 +441,7 @@ StorageNode::shutdown() LOG(debug, "Done shutting down node"); } -void StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) +void StorageNode::configure(std::unique_ptr<StorServerConfig> config) { // When we get config, we try to grab the config lock to ensure noone // else is doing configuration work, and then we write the new config @@ -476,11 +451,14 @@ void StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorSe vespalib::LockGuard configLockGuard(_configLock); _newServerConfig.reset(config.release()); } - if (_serverConfig.get() != 0) handleLiveConfigUpdate(); + if (_serverConfig) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } } void -StorageNode::configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) +StorageNode::configure(std::unique_ptr<UpgradingConfig> config) { // When we get config, we try to grab the config lock to ensure noone // else is doing configuration work, and then we write the new config @@ -490,11 +468,14 @@ StorageNode::configure(std::unique_ptr<vespa::config::content::UpgradingConfig> vespalib::LockGuard configLockGuard(_configLock); _newClusterConfig.reset(config.release()); } - if (_clusterConfig.get() != 0) handleLiveConfigUpdate(); + if (_clusterConfig) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } } void -StorageNode::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) +StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { // When we get config, we try to grab the config lock to ensure noone // else is doing configuration work, and then we write the new config @@ -504,17 +485,23 @@ StorageNode::configure(std::unique_ptr<vespa::config::content::StorDistributionC vespalib::LockGuard configLockGuard(_configLock); _newDistributionConfig.reset(config.release()); } - if (_distributionConfig.get() != 0) handleLiveConfigUpdate(); + if (_distributionConfig) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } } void -StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> config) +StorageNode::configure(std::unique_ptr<StorPrioritymappingConfig> config) { { vespalib::LockGuard configLockGuard(_configLock); _newPriorityConfig.reset(config.release()); } - if (_priorityConfig.get() != 0) handleLiveConfigUpdate(); + if (_priorityConfig) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } } void StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> config, @@ -527,7 +514,10 @@ void StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> confi vespalib::LockGuard configLockGuard(_configLock); _newDoctypesConfig.reset(config.release()); } - if (_doctypesConfig.get() != 0) handleLiveConfigUpdate(); + if (_doctypesConfig) { + InitialGuard concurrent_config_guard(_initial_config_mutex); + handleLiveConfigUpdate(concurrent_config_guard); + } } bool @@ -548,10 +538,8 @@ StorageNode::waitUntilInitialized(uint32_t timeout) { clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout)); while (true) { { - NodeStateUpdater::Lock::SP lock( - _component->getStateUpdater().grabStateChangeLock()); - lib::NodeState nodeState( - *_component->getStateUpdater().getReportedNodeState()); + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); + lib::NodeState nodeState(*_component->getStateUpdater().getReportedNodeState()); if (nodeState.getState() == lib::State::UP) break; } FastOS_Thread::Sleep(10); @@ -580,7 +568,6 @@ StorageNode::requestShutdown(vespalib::stringref reason) _attemptedStopped = true; } - void StorageNode::notifyPartitionDown(int partId, vespalib::stringref reason) { @@ -603,7 +590,6 @@ StorageNode::notifyPartitionDown(int partId, vespalib::stringref reason) _component->getStateUpdater().setReportedNodeState(nodeState); } - std::unique_ptr<StateManager> StorageNode::releaseStateManager() { return std::move(_stateManager); diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 753f3c85330..9b727ef3e0c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -26,6 +26,7 @@ #include <vespa/document/config/config-documenttypes.h> #include <vespa/config-upgrading.h> #include <vespa/config-stor-distribution.h> +#include <mutex> namespace document { class DocumentTypeRepo; } @@ -95,7 +96,11 @@ public: // For testing StorageLink* getChain() { return _chain.get(); } virtual void initializeStatusWebServer(); - +protected: + using StorServerConfig = vespa::config::content::core::StorServerConfig; + using UpgradingConfig = vespa::config::content::UpgradingConfig; + using StorDistributionConfig = vespa::config::content::StorDistributionConfig; + using StorPrioritymappingConfig = vespa::config::content::core::StorPrioritymappingConfig; private: bool _singleThreadedDebugMode; // Subscriptions to config @@ -128,29 +133,31 @@ private: std::unique_ptr<StorageLink> _chain; /** Implementation of config callbacks. */ - void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override; - void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override; - void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override; - void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override; + void configure(std::unique_ptr<StorServerConfig> config) override; + void configure(std::unique_ptr<UpgradingConfig> config) override; + void configure(std::unique_ptr<StorDistributionConfig> config) override; + void configure(std::unique_ptr<StorPrioritymappingConfig>) override; virtual void configure(std::unique_ptr<document::DocumenttypesConfig> config, bool hasChanged, int64_t generation); - void updateUpgradeFlag(const vespa::config::content::UpgradingConfig&); + void updateUpgradeFlag(const UpgradingConfig&); protected: // Lock taken while doing configuration of the server. vespalib::Lock _configLock; + std::mutex _initial_config_mutex; + using InitialGuard = std::lock_guard<std::mutex>; // Current running config. Kept, such that we can see what has been // changed in live config updates. - std::unique_ptr<vespa::config::content::core::StorServerConfig> _serverConfig; - std::unique_ptr<vespa::config::content::UpgradingConfig> _clusterConfig; - std::unique_ptr<vespa::config::content::StorDistributionConfig> _distributionConfig; - std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> _priorityConfig; + std::unique_ptr<StorServerConfig> _serverConfig; + std::unique_ptr<UpgradingConfig> _clusterConfig; + std::unique_ptr<StorDistributionConfig> _distributionConfig; + std::unique_ptr<StorPrioritymappingConfig> _priorityConfig; std::unique_ptr<document::DocumenttypesConfig> _doctypesConfig; // New configs gotten that has yet to have been handled - std::unique_ptr<vespa::config::content::core::StorServerConfig> _newServerConfig; - std::unique_ptr<vespa::config::content::UpgradingConfig> _newClusterConfig; - std::unique_ptr<vespa::config::content::StorDistributionConfig> _newDistributionConfig; - std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> _newPriorityConfig; + std::unique_ptr<StorServerConfig> _newServerConfig; + std::unique_ptr<UpgradingConfig> _newClusterConfig; + std::unique_ptr<StorDistributionConfig> _newDistributionConfig; + std::unique_ptr<StorPrioritymappingConfig> _newPriorityConfig; std::unique_ptr<document::DocumenttypesConfig> _newDoctypesConfig; std::unique_ptr<StorageComponent> _component; config::ConfigUri _configUri; @@ -169,7 +176,7 @@ protected: virtual void subscribeToConfigs(); virtual void initializeNodeSpecific() = 0; virtual std::unique_ptr<StorageLink> createChain() = 0; - virtual void handleLiveConfigUpdate(); + virtual void handleLiveConfigUpdate(const InitialGuard & initGuard); void shutdown(); virtual void removeConfigSubscriptions(); }; |