summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp45
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h9
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp232
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h37
4 files changed, 155 insertions, 168 deletions
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 79d57a8f7e4..e2861ef42cd 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -17,13 +17,15 @@
#include <vespa/storage/bucketmover/bucketmover.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
-#include <vespa/storage/persistence/provider_error_wrapper.h>
#include <vespa/persistence/spi/exceptions.h>
#include <vespa/messagebus/rpcmessagebus.h>
#include <vespa/log/log.h>
LOG_SETUP(".node.servicelayer");
+
+using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBuilder;
+
namespace storage {
ServiceLayerNode::ServiceLayerNode(
@@ -53,8 +55,7 @@ void ServiceLayerNode::init()
throw spi::HandledException("Failed provider init: " + initResult.toString(), VESPA_STRLOC);
}
- spi::PartitionStateListResult result(
- _persistenceProvider.getPartitionStates());
+ spi::PartitionStateListResult result(_persistenceProvider.getPartitionStates());
if (result.hasError()) {
LOG(error, "Failed to get partition list from persistence provider: %s", result.toString().c_str());
throw spi::HandledException("Failed to get partition list: " + result.toString(), VESPA_STRLOC);
@@ -62,8 +63,7 @@ void ServiceLayerNode::init()
_partitions = result.getList();
if (_partitions.size() == 0) {
LOG(error, "No partitions in persistence provider. See documentation "
- "for your persistence provider as to how to set up "
- "partitions in it.");
+ "for your persistence provider as to how to set up partitions in it.");
throw spi::HandledException("No partitions in provider", VESPA_STRLOC);
}
try{
@@ -93,7 +93,7 @@ ServiceLayerNode::subscribeToConfigs()
{
StorageNode::subscribeToConfigs();
_configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
- _configFetcher->subscribe<vespa::config::storage::StorDevicesConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<StorDevicesConfig>(_configUri.getConfigId(), this);
vespalib::LockGuard configLockGuard(_configLock);
_deviceConfig = std::move(_newDevicesConfig);
@@ -123,8 +123,7 @@ ServiceLayerNode::initializeNodeSpecific()
// Give node state to mount point initialization, such that we can
// get disk count and state of unavailable disks set in reported
// node state.
- NodeStateUpdater::Lock::SP lock(
- _component->getStateUpdater().grabStateChangeLock());
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
ns.setDiskCount(_partitions.size());
@@ -166,12 +165,12 @@ ServiceLayerNode::initializeNodeSpecific()
if (DIFFER(a)) { LOG(warning, "Live config failure: %s.", b); }
void
-ServiceLayerNode::handleLiveConfigUpdate()
+ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
if (_newServerConfig) {
bool updated = false;
vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
- vespa::config::content::core::StorServerConfig& newC(*_newServerConfig);
+ StorServerConfig& newC(*_newServerConfig);
DIFFERWARN(diskCount, "Cannot alter partition count of node live");
{
updated = false;
@@ -207,12 +206,11 @@ ServiceLayerNode::handleLiveConfigUpdate()
}
}
}
- StorageNode::handleLiveConfigUpdate();
+ StorageNode::handleLiveConfigUpdate(initGuard);
}
void
-ServiceLayerNode::configure(
- std::unique_ptr<vespa::config::storage::StorDevicesConfig> config)
+ServiceLayerNode::configure(std::unique_ptr<StorDevicesConfig> config)
{
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
@@ -223,20 +221,19 @@ ServiceLayerNode::configure(
_newDevicesConfig = std::move(config);
}
if (_distributionConfig) {
- handleLiveConfigUpdate();
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
}
}
VisitorMessageSession::UP
ServiceLayerNode::createSession(Visitor& visitor, VisitorThread& thread)
{
- MessageBusVisitorMessageSession::UP mbusSession(
- new MessageBusVisitorMessageSession(visitor, thread));
+ auto mbusSession = std::make_unique<MessageBusVisitorMessageSession>(visitor, thread);
mbus::SourceSessionParams srcParams;
srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
srcParams.setReplyHandler(*mbusSession);
- mbusSession->setSourceSession(
- _communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams));
+ mbusSession->setSourceSession(_communicationManager->getMessageBus().getMessageBus().createSourceSession(srcParams));
return VisitorMessageSession::UP(std::move(mbusSession));
}
@@ -270,17 +267,13 @@ ServiceLayerNode::createChain()
chain->push_back(StorageLink::UP(new bucketmover::BucketMover(_configUri, compReg)));
chain->push_back(StorageLink::UP(new StorageBucketDBInitializer(
_configUri, _partitions, getDoneInitializeHandler(), compReg)));
- chain->push_back(StorageLink::UP(new BucketManager(
- _configUri, _context.getComponentRegister())));
+ chain->push_back(StorageLink::UP(new BucketManager(_configUri, _context.getComponentRegister())));
chain->push_back(StorageLink::UP(new VisitorManager(
- _configUri, _context.getComponentRegister(),
- *this, _externalVisitors)));
+ _configUri, _context.getComponentRegister(), *this, _externalVisitors)));
chain->push_back(StorageLink::UP(new ModifiedBucketChecker(
- _context.getComponentRegister(), _persistenceProvider,
- _configUri)));
+ _context.getComponentRegister(), _persistenceProvider, _configUri)));
chain->push_back(StorageLink::UP(_fileStorManager = new FileStorManager(
- _configUri, _partitions, _persistenceProvider,
- _context.getComponentRegister())));
+ _configUri, _partitions, _persistenceProvider, _context.getComponentRegister())));
chain->push_back(StorageLink::UP(releaseStateManager().release()));
// Lifetimes of all referenced components shall outlive the last call going
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index 848cc1d1475..12446152b90 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -28,6 +28,7 @@ class ServiceLayerNode
private config::IFetcherCallback<vespa::config::storage::StorDevicesConfig>
{
+ using StorDevicesConfig = vespa::config::storage::StorDevicesConfig;
ServiceLayerNodeContext& _context;
spi::PersistenceProvider& _persistenceProvider;
spi::PartitionStateList _partitions;
@@ -36,8 +37,8 @@ class ServiceLayerNode
// FIXME: Should probably use the fetcher in StorageNode
std::unique_ptr<config::ConfigFetcher> _configFetcher;
- std::unique_ptr<vespa::config::storage::StorDevicesConfig> _deviceConfig;
- std::unique_ptr<vespa::config::storage::StorDevicesConfig> _newDevicesConfig;
+ std::unique_ptr<StorDevicesConfig> _deviceConfig;
+ std::unique_ptr<StorDevicesConfig> _newDevicesConfig;
FileStorManager* _fileStorManager;
bool _init_has_been_called;
bool _noUsablePartitionMode;
@@ -63,8 +64,8 @@ public:
private:
void subscribeToConfigs() override;
void initializeNodeSpecific() override;
- void handleLiveConfigUpdate() override;
- void configure(std::unique_ptr<vespa::config::storage::StorDevicesConfig> config) override;
+ void handleLiveConfigUpdate(const InitialGuard & initGuard) override;
+ void configure(std::unique_ptr<StorDevicesConfig> config) override;
VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override;
documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override;
std::unique_ptr<StorageLink> createChain() override;
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 85f54431cfb..855efaed6aa 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -19,6 +19,10 @@
#include <vespa/log/log.h>
LOG_SETUP(".node.server");
+using vespa::config::content::StorDistributionConfigBuilder;
+using vespa::config::content::core::StorServerConfigBuilder;
+using std::make_shared;
+
namespace storage {
namespace {
@@ -86,10 +90,11 @@ void
StorageNode::subscribeToConfigs()
{
_configFetcher.reset(new config::ConfigFetcher(_configUri.getContext()));
- _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<vespa::config::content::UpgradingConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<vespa::config::content::core::StorServerConfig>(_configUri.getConfigId(), this);
- _configFetcher->subscribe<vespa::config::content::core::StorPrioritymappingConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this);
+ _configFetcher->subscribe<StorPrioritymappingConfig>(_configUri.getConfigId(), this);
+
_configFetcher->start();
vespalib::LockGuard configLockGuard(_configLock);
@@ -99,10 +104,13 @@ StorageNode::subscribeToConfigs()
_priorityConfig = std::move(_newPriorityConfig);
}
-
void
StorageNode::initialize()
{
+ // Avoid racing with concurrent reconfigurations before we've set up the entire
+ // node component stack.
+ std::lock_guard<std::mutex> concurrent_config_guard(_initial_config_mutex);
+
_context.getComponentRegister().registerShutdownListener(*this);
// Fetch configs needed first. These functions will just grab the config
@@ -116,21 +124,14 @@ StorageNode::initialize()
// available
_rootFolder = _serverConfig->rootFolder;
- _context.getComponentRegister().setNodeInfo(
- _serverConfig->clusterName, getNodeType(),
- _serverConfig->nodeIndex);
- _context.getComponentRegister().setLoadTypes(
- documentapi::LoadTypeSet::SP(
- new documentapi::LoadTypeSet(_configUri)));
- _context.getComponentRegister().setBucketIdFactory(
- document::BucketIdFactory());
- _context.getComponentRegister().setDistribution(
- lib::Distribution::SP(new lib::Distribution(*_distributionConfig)));
+ _context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex);
+ _context.getComponentRegister().setLoadTypes(make_shared<documentapi::LoadTypeSet>(_configUri));
+ _context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory());
+ _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig));
_context.getComponentRegister().setPriorityConfig(*_priorityConfig);
_metrics.reset(new StorageMetricSet);
- _component.reset(new StorageComponent(
- _context.getComponentRegister(), "storagenode"));
+ _component.reset(new StorageComponent(_context.getComponentRegister(), "storagenode"));
_component->registerMetric(*_metrics);
if (!_context.getComponentRegister().hasMetricManager()) {
_metricManager.reset(new metrics::MetricManager);
@@ -168,17 +169,13 @@ StorageNode::initialize()
_generationFetcher));
// Start deadlock detector
- _deadLockDetector.reset(new DeadLockDetector(
- _context.getComponentRegister()));
- _deadLockDetector->enableWarning(
- _serverConfig->enableDeadLockDetectorWarnings);
+ _deadLockDetector.reset(new DeadLockDetector(_context.getComponentRegister()));
+ _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector);
_deadLockDetector->setProcessSlack(framework::MilliSecTime(
- static_cast<uint32_t>(
- _serverConfig->deadLockDetectorTimeoutSlack * 1000)));
+ static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000)));
_deadLockDetector->setWaitSlack(framework::MilliSecTime(
- static_cast<uint32_t>(
- _serverConfig->deadLockDetectorTimeoutSlack * 1000)));
+ static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000)));
_chain.reset(createChain().release());
@@ -188,7 +185,7 @@ StorageNode::initialize()
// reinitializing metric manager often.
_context.getComponentRegister().getMetricManager().init(_configUri, _context.getThreadPool());
- if (_chain.get() != 0) {
+ if (_chain) {
LOG(debug, "Storage chain configured. Calling open()");
_chain->open();
}
@@ -208,9 +205,8 @@ void
StorageNode::initializeStatusWebServer()
{
if (_singleThreadedDebugMode) return;
- _statusWebServer.reset(new StatusWebServer(
- _context.getComponentRegister(), _context.getComponentRegister(),
- _configUri));
+ _statusWebServer.reset(new StatusWebServer(_context.getComponentRegister(),
+ _context.getComponentRegister(), _configUri));
}
#define DIFFER(a) (!(oldC.a == newC.a))
@@ -223,13 +219,13 @@ StorageNode::setNewDocumentRepo(const document::DocumentTypeRepo::SP& repo)
{
vespalib::LockGuard configLockGuard(_configLock);
_context.getComponentRegister().setDocumentTypeRepo(repo);
- if (_communicationManager != 0) {
+ if (_communicationManager != nullptr) {
_communicationManager->updateMessagebusProtocol(repo);
}
}
void
-StorageNode::updateUpgradeFlag(const vespa::config::content::UpgradingConfig& config)
+StorageNode::updateUpgradeFlag(const UpgradingConfig& config)
{
framework::UpgradeFlags flag(framework::NO_UPGRADE_SPECIAL_HANDLING_ACTIVE);
if (config.upgradingMajorTo) {
@@ -245,69 +241,62 @@ StorageNode::updateUpgradeFlag(const vespa::config::content::UpgradingConfig& co
}
void
-StorageNode::handleLiveConfigUpdate()
+StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
- // Make sure we don't conflict with initialize or shutdown threads.
+ // Make sure we don't conflict with initialize or shutdown threads.
+ (void) initGuard;
vespalib::LockGuard configLockGuard(_configLock);
- // If storage haven't initialized, ignore. Initialize code will handle
- // this config.
- if (_chain.get() == 0) return;
+
+ assert(_chain);
// If we get here, initialize is done running. We have to handle changes
// we want to handle.
- if (_newServerConfig.get() != 0) {
+ if (_newServerConfig) {
bool updated = false;
- vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig);
- vespa::config::content::core::StorServerConfig& newC(*_newServerConfig);
+ StorServerConfigBuilder oldC(*_serverConfig);
+ StorServerConfig& newC(*_newServerConfig);
DIFFERWARN(rootFolder, "Cannot alter root folder of node live");
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
{
if (DIFFER(memorytouse)) {
- LOG(info, "Live config update: Memory to use changed "
- "from %" PRId64 " to %" PRId64 ".",
+ LOG(info, "Live config update: Memory to use changed from %" PRId64 " to %" PRId64 ".",
oldC.memorytouse, newC.memorytouse);
ASSIGN(memorytouse);
- _context.getMemoryManager().setMaximumMemoryUsage(
- newC.memorytouse);
+ _context.getMemoryManager().setMaximumMemoryUsage(newC.memorytouse);
}
}
- _serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC));
- _newServerConfig.reset(0);
+ _serverConfig.reset(new StorServerConfig(oldC));
+ _newServerConfig.reset();
(void)updated;
}
- if (_newDistributionConfig.get() != 0) {
- vespa::config::content::StorDistributionConfigBuilder oldC(*_distributionConfig);
- vespa::config::content::StorDistributionConfig& newC(*_newDistributionConfig);
+ if (_newDistributionConfig) {
+ StorDistributionConfigBuilder oldC(*_distributionConfig);
+ StorDistributionConfig& newC(*_newDistributionConfig);
bool updated = false;
if (DIFFER(redundancy)) {
- LOG(info, "Live config update: Altering redundancy from %u to %u.",
- oldC.redundancy, newC.redundancy);
+ LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy);
ASSIGN(redundancy);
}
if (DIFFER(initialRedundancy)) {
- LOG(info, "Live config update: Altering initial redundancy "
- "from %u to %u.",
+ LOG(info, "Live config update: Altering initial redundancy from %u to %u.",
oldC.initialRedundancy, newC.initialRedundancy);
ASSIGN(initialRedundancy);
}
if (DIFFER(ensurePrimaryPersisted)) {
- LOG(info, "Live config update: Now%s requiring primary copy to "
- "succeed for n of m operation to succeed.",
+ LOG(info, "Live config update: Now%s requiring primary copy to succeed for n of m operation to succeed.",
newC.ensurePrimaryPersisted ? "" : " not");
ASSIGN(ensurePrimaryPersisted);
}
if (DIFFER(activePerLeafGroup)) {
- LOG(info, "Live config update: Active per leaf group setting "
- "altered from %s to %s",
+ LOG(info, "Live config update: Active per leaf group setting altered from %s to %s",
oldC.activePerLeafGroup ? "true" : "false",
newC.activePerLeafGroup ? "true" : "false");
ASSIGN(activePerLeafGroup);
}
if (DIFFER(readyCopies)) {
- LOG(info, "Live config update: Altering number of searchable "
- "copies from %u to %u",
+ LOG(info, "Live config update: Altering number of searchable copies from %u to %u",
oldC.readyCopies, newC.readyCopies);
ASSIGN(readyCopies);
}
@@ -316,35 +305,28 @@ StorageNode::handleLiveConfigUpdate()
ASSIGN(group);
}
if (DIFFER(diskDistribution)) {
- LOG(info, "Live config update: Disk distribution altered from "
- "%s to %s.",
- vespa::config::content::StorDistributionConfig::getDiskDistributionName(
- oldC.diskDistribution).c_str(),
- vespa::config::content::StorDistributionConfig::getDiskDistributionName(
- newC.diskDistribution).c_str());
+ LOG(info, "Live config update: Disk distribution altered from %s to %s.",
+ StorDistributionConfig::getDiskDistributionName(oldC.diskDistribution).c_str(),
+ StorDistributionConfig::getDiskDistributionName(newC.diskDistribution).c_str());
ASSIGN(diskDistribution);
}
- _distributionConfig.reset(new vespa::config::content::StorDistributionConfig(oldC));
- _newDistributionConfig.reset(0);
+ _distributionConfig.reset(new StorDistributionConfig(oldC));
+ _newDistributionConfig.reset();
if (updated) {
- _context.getComponentRegister().setDistribution(
- lib::Distribution::SP(new lib::Distribution(oldC)));
- for (StorageLink* link = _chain.get(); link != 0;
- link = link->getNextLink())
- {
+ _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC));
+ for (StorageLink* link = _chain.get(); link != 0; link = link->getNextLink()) {
link->storageDistributionChanged();
}
}
}
- if (_newClusterConfig.get() != 0) {
+ if (_newClusterConfig) {
updateUpgradeFlag(*_newClusterConfig);
if (*_clusterConfig != *_newClusterConfig) {
- LOG(warning, "Live config failure: "
- "Cannot alter cluster config of node live.");
+ LOG(warning, "Live config failure: Cannot alter cluster config of node live.");
}
- _newClusterConfig.reset(0);
+ _newClusterConfig.reset();
}
- if (_newPriorityConfig.get() != 0) {
+ if (_newPriorityConfig) {
_priorityConfig = std::move(_newPriorityConfig);
_context.getComponentRegister().setPriorityConfig(*_priorityConfig);
}
@@ -354,31 +336,27 @@ void
StorageNode::notifyDoneInitializing()
{
bool isDistributor = (getNodeType() == lib::NodeType::DISTRIBUTOR);
- LOG(info, "%s node ready. Done initializing. Giving out of sequence "
- "metric event. Config id is %s",
+ LOG(info, "%s node ready. Done initializing. Giving out of sequence metric event. Config id is %s",
isDistributor ? "Distributor" : "Storage", _configUri.getConfigId().c_str());
_context.getComponentRegister().getMetricManager().forceEventLogging();
if (!_singleThreadedDebugMode) {
EV_STARTED(isDistributor ? "distributor" : "storagenode");
}
- NodeStateUpdater::Lock::SP lock(
- _component->getStateUpdater().grabStateChangeLock());
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
ns.setState(lib::State::UP);
_component->getStateUpdater().setReportedNodeState(ns);
_chain->doneInit();
}
-StorageNode::~StorageNode()
-{
-}
+StorageNode::~StorageNode() = default;
void
StorageNode::removeConfigSubscriptions()
{
LOG(debug, "Removing config subscribers");
- _configFetcher.reset(0);
+ _configFetcher.reset();
}
void
@@ -387,8 +365,7 @@ StorageNode::shutdown()
// Try to shut down in opposite order of initialize. Bear in mind that
// we might be shutting down after init exception causing only parts
// of the server to have initialize
- LOG(debug, "Shutting down storage node of type %s",
- getNodeType().toString().c_str());
+ LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str());
if (!_attemptedStopped) {
LOG(warning, "Storage killed before requestShutdown() was called. No "
"reason has been given for why we're stopping.");
@@ -396,7 +373,7 @@ StorageNode::shutdown()
// Remove the subscription to avoid more callbacks from config
removeConfigSubscriptions();
- if (_chain.get()) {
+ if (_chain) {
LOG(debug, "Closing storage chain");
_chain->close();
LOG(debug, "Flushing storage chain");
@@ -409,56 +386,54 @@ StorageNode::shutdown()
}
if (!_singleThreadedDebugMode) {
- EV_STOPPING(getNodeType() == lib::NodeType::DISTRIBUTOR
- ? "distributor" : "storagenode", "Stopped");
+ EV_STOPPING(getNodeType() == lib::NodeType::DISTRIBUTOR ? "distributor" : "storagenode", "Stopped");
}
if (_context.getComponentRegister().hasMetricManager()) {
- LOG(debug, "Stopping metric manager. "
- "(Deleting chain may remove metrics)");
+ LOG(debug, "Stopping metric manager. (Deleting chain may remove metrics)");
_context.getComponentRegister().getMetricManager().stop();
}
// Delete the status web server before the actual status providers, to
// ensure that web server does not query providers during shutdown
- _statusWebServer.reset(0);
+ _statusWebServer.reset();
// For this to be safe, noone can touch the state updater after we start
// deleting the storage chain
LOG(debug, "Removing state updater pointer as we're about to delete it.");
- if (_chain.get()) {
+ if (_chain) {
LOG(debug, "Deleting storage chain");
- _chain.reset(0);
+ _chain.reset();
}
- if (_statusMetrics.get()) {
+ if (_statusMetrics) {
LOG(debug, "Deleting status metrics consumer");
- _statusMetrics.reset(0);
+ _statusMetrics.reset();
}
- if (_stateReporter.get()) {
+ if (_stateReporter) {
LOG(debug, "Deleting state reporter");
- _stateReporter.reset(0);
+ _stateReporter.reset();
}
- if (_memoryStatusViewer.get()) {
+ if (_memoryStatusViewer) {
LOG(debug, "Deleting memory status viewer");
- _memoryStatusViewer.reset(0);
+ _memoryStatusViewer.reset();
}
- if (_stateManager.get()) {
+ if (_stateManager) {
LOG(debug, "Deleting state manager");
- _stateManager.reset(0);
+ _stateManager.reset();
}
- if (_deadLockDetector.get()) {
+ if (_deadLockDetector) {
LOG(debug, "Deleting dead lock detector");
- _deadLockDetector.reset(0);
+ _deadLockDetector.reset();
}
- if (_metricManager.get()) {
+ if (_metricManager) {
LOG(debug, "Deleting metric manager");
- _metricManager.reset(0);
+ _metricManager.reset();
}
- if (_metrics.get()) {
+ if (_metrics) {
LOG(debug, "Deleting metric set");
_metrics.reset();
}
- if (_component.get()) {
+ if (_component) {
LOG(debug, "Deleting component");
_component.reset();
}
@@ -466,7 +441,7 @@ StorageNode::shutdown()
LOG(debug, "Done shutting down node");
}
-void StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config)
+void StorageNode::configure(std::unique_ptr<StorServerConfig> config)
{
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
@@ -476,11 +451,14 @@ void StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorSe
vespalib::LockGuard configLockGuard(_configLock);
_newServerConfig.reset(config.release());
}
- if (_serverConfig.get() != 0) handleLiveConfigUpdate();
+ if (_serverConfig) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
}
void
-StorageNode::configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config)
+StorageNode::configure(std::unique_ptr<UpgradingConfig> config)
{
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
@@ -490,11 +468,14 @@ StorageNode::configure(std::unique_ptr<vespa::config::content::UpgradingConfig>
vespalib::LockGuard configLockGuard(_configLock);
_newClusterConfig.reset(config.release());
}
- if (_clusterConfig.get() != 0) handleLiveConfigUpdate();
+ if (_clusterConfig) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
}
void
-StorageNode::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
+StorageNode::configure(std::unique_ptr<StorDistributionConfig> config)
{
// When we get config, we try to grab the config lock to ensure noone
// else is doing configuration work, and then we write the new config
@@ -504,17 +485,23 @@ StorageNode::configure(std::unique_ptr<vespa::config::content::StorDistributionC
vespalib::LockGuard configLockGuard(_configLock);
_newDistributionConfig.reset(config.release());
}
- if (_distributionConfig.get() != 0) handleLiveConfigUpdate();
+ if (_distributionConfig) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
}
void
-StorageNode::configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> config)
+StorageNode::configure(std::unique_ptr<StorPrioritymappingConfig> config)
{
{
vespalib::LockGuard configLockGuard(_configLock);
_newPriorityConfig.reset(config.release());
}
- if (_priorityConfig.get() != 0) handleLiveConfigUpdate();
+ if (_priorityConfig) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
}
void StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> config,
@@ -527,7 +514,10 @@ void StorageNode::configure(std::unique_ptr<document::DocumenttypesConfig> confi
vespalib::LockGuard configLockGuard(_configLock);
_newDoctypesConfig.reset(config.release());
}
- if (_doctypesConfig.get() != 0) handleLiveConfigUpdate();
+ if (_doctypesConfig) {
+ InitialGuard concurrent_config_guard(_initial_config_mutex);
+ handleLiveConfigUpdate(concurrent_config_guard);
+ }
}
bool
@@ -548,10 +538,8 @@ StorageNode::waitUntilInitialized(uint32_t timeout) {
clock.getTimeInMillis() + framework::MilliSecTime(1000 * timeout));
while (true) {
{
- NodeStateUpdater::Lock::SP lock(
- _component->getStateUpdater().grabStateChangeLock());
- lib::NodeState nodeState(
- *_component->getStateUpdater().getReportedNodeState());
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
+ lib::NodeState nodeState(*_component->getStateUpdater().getReportedNodeState());
if (nodeState.getState() == lib::State::UP) break;
}
FastOS_Thread::Sleep(10);
@@ -580,7 +568,6 @@ StorageNode::requestShutdown(vespalib::stringref reason)
_attemptedStopped = true;
}
-
void
StorageNode::notifyPartitionDown(int partId, vespalib::stringref reason)
{
@@ -603,7 +590,6 @@ StorageNode::notifyPartitionDown(int partId, vespalib::stringref reason)
_component->getStateUpdater().setReportedNodeState(nodeState);
}
-
std::unique_ptr<StateManager>
StorageNode::releaseStateManager() {
return std::move(_stateManager);
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index 753f3c85330..9b727ef3e0c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -26,6 +26,7 @@
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/config-upgrading.h>
#include <vespa/config-stor-distribution.h>
+#include <mutex>
namespace document { class DocumentTypeRepo; }
@@ -95,7 +96,11 @@ public:
// For testing
StorageLink* getChain() { return _chain.get(); }
virtual void initializeStatusWebServer();
-
+protected:
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+ using UpgradingConfig = vespa::config::content::UpgradingConfig;
+ using StorDistributionConfig = vespa::config::content::StorDistributionConfig;
+ using StorPrioritymappingConfig = vespa::config::content::core::StorPrioritymappingConfig;
private:
bool _singleThreadedDebugMode;
// Subscriptions to config
@@ -128,29 +133,31 @@ private:
std::unique_ptr<StorageLink> _chain;
/** Implementation of config callbacks. */
- void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> config) override;
- void configure(std::unique_ptr<vespa::config::content::UpgradingConfig> config) override;
- void configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) override;
- void configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig>) override;
+ void configure(std::unique_ptr<StorServerConfig> config) override;
+ void configure(std::unique_ptr<UpgradingConfig> config) override;
+ void configure(std::unique_ptr<StorDistributionConfig> config) override;
+ void configure(std::unique_ptr<StorPrioritymappingConfig>) override;
virtual void configure(std::unique_ptr<document::DocumenttypesConfig> config,
bool hasChanged, int64_t generation);
- void updateUpgradeFlag(const vespa::config::content::UpgradingConfig&);
+ void updateUpgradeFlag(const UpgradingConfig&);
protected:
// Lock taken while doing configuration of the server.
vespalib::Lock _configLock;
+ std::mutex _initial_config_mutex;
+ using InitialGuard = std::lock_guard<std::mutex>;
// Current running config. Kept, such that we can see what has been
// changed in live config updates.
- std::unique_ptr<vespa::config::content::core::StorServerConfig> _serverConfig;
- std::unique_ptr<vespa::config::content::UpgradingConfig> _clusterConfig;
- std::unique_ptr<vespa::config::content::StorDistributionConfig> _distributionConfig;
- std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> _priorityConfig;
+ std::unique_ptr<StorServerConfig> _serverConfig;
+ std::unique_ptr<UpgradingConfig> _clusterConfig;
+ std::unique_ptr<StorDistributionConfig> _distributionConfig;
+ std::unique_ptr<StorPrioritymappingConfig> _priorityConfig;
std::unique_ptr<document::DocumenttypesConfig> _doctypesConfig;
// New configs gotten that has yet to have been handled
- std::unique_ptr<vespa::config::content::core::StorServerConfig> _newServerConfig;
- std::unique_ptr<vespa::config::content::UpgradingConfig> _newClusterConfig;
- std::unique_ptr<vespa::config::content::StorDistributionConfig> _newDistributionConfig;
- std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> _newPriorityConfig;
+ std::unique_ptr<StorServerConfig> _newServerConfig;
+ std::unique_ptr<UpgradingConfig> _newClusterConfig;
+ std::unique_ptr<StorDistributionConfig> _newDistributionConfig;
+ std::unique_ptr<StorPrioritymappingConfig> _newPriorityConfig;
std::unique_ptr<document::DocumenttypesConfig> _newDoctypesConfig;
std::unique_ptr<StorageComponent> _component;
config::ConfigUri _configUri;
@@ -169,7 +176,7 @@ protected:
virtual void subscribeToConfigs();
virtual void initializeNodeSpecific() = 0;
virtual std::unique_ptr<StorageLink> createChain() = 0;
- virtual void handleLiveConfigUpdate();
+ virtual void handleLiveConfigUpdate(const InitialGuard & initGuard);
void shutdown();
virtual void removeConfigSubscriptions();
};