diff options
Diffstat (limited to 'storage/src/vespa/storage/storageserver')
70 files changed, 695 insertions, 628 deletions
diff --git a/storage/src/vespa/storage/storageserver/CMakeLists.txt b/storage/src/vespa/storage/storageserver/CMakeLists.txt index 1ef670f96ac..2a9c2306dfc 100644 --- a/storage/src/vespa/storage/storageserver/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_storageserver OBJECT SOURCES diff --git a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h index 2a1783d13da..974250f57ea 100644 --- a/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h +++ b/storage/src/vespa/storage/storageserver/applicationgenerationfetcher.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * \class storage::ApplicationGenerationFetcher * \ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index cfe283edb9b..bfc38e0c8ba 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bouncer.h" #include "bouncer_metrics.h" @@ -21,34 +21,19 @@ LOG_SETUP(".bouncer"); namespace storage { -Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Bouncer"), - _config(new vespa::config::content::core::StorBouncerConfig()), +Bouncer::Bouncer(StorageComponentRegister& compReg, const StorBouncerConfig& bootstrap_config) + : StorageLink("Bouncer", MsgDownOnFlush::Disallowed, MsgUpOnClosed::Allowed), + _config(std::make_unique<vespa::config::content::core::StorBouncerConfig>(bootstrap_config)), _component(compReg, "bouncer"), _lock(), _baselineNodeState("s:i"), _derivedNodeStates(), _clusterState(&lib::State::UP), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), - _metrics(std::make_unique<BouncerMetrics>()) + _metrics(std::make_unique<BouncerMetrics>()), + _closed(false) { _component.getStateUpdater().addStateListener(*this); _component.registerMetric(*_metrics); - // Register for config. Normally not critical, so catching config - // exception allowing program to continue if missing/faulty config. - try{ - if (!configUri.empty()) { - _configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); - } else { - LOG(info, "No config id specified. Using defaults rather than " - "config"); - } - } catch (config::InvalidConfigException& e) { - LOG(info, "Bouncer failed to load config '%s'. This " - "is not critical since it has sensible defaults: %s", - configUri.getConfigId().c_str(), e.what()); - } } Bouncer::~Bouncer() @@ -62,23 +47,25 @@ Bouncer::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; + std::lock_guard guard(_lock); out << "Bouncer(" << _baselineNodeState << ")"; } void Bouncer::onClose() { - _configFetcher->close(); _component.getStateUpdater().removeStateListener(*this); + std::lock_guard guard(_lock); + _closed = true; } void -Bouncer::configure(std::unique_ptr<vespa::config::content::core::StorBouncerConfig> config) +Bouncer::on_configure(const vespa::config::content::core::StorBouncerConfig& config) { - log_config_received(*config); - validateConfig(*config); + validateConfig(config); + auto new_config = std::make_unique<StorBouncerConfig>(config); std::lock_guard lock(_lock); - _config = std::move(config); + _config = std::move(new_config); } const BouncerMetrics& Bouncer::metrics() const noexcept { @@ -86,8 +73,7 @@ const BouncerMetrics& Bouncer::metrics() const noexcept { } void -Bouncer::validateConfig( - const vespa::config::content::core::StorBouncerConfig& newConfig) const +Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const { if (newConfig.feedRejectionPriorityThreshold != -1) { if (newConfig.feedRejectionPriorityThreshold @@ -112,12 +98,11 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const { } void -Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, - const lib::State& state) +Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state) { // If we're not up or retired, fail due to this nodes state. std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow command of type " << msg.getType() << " when node is in state " << state.toString(true); @@ -128,8 +113,7 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, } void -Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, - int maxClockSkewInSeconds) +Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds) { auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg); std::ostringstream ost; @@ -140,7 +124,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, as_cmd.getSourceIndex(), ost.str().c_str()); _metrics->clock_skew_aborts.inc(); - std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release()); + std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); sendUp(reply); } @@ -148,8 +132,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, void Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow external load while cluster is in state " << cluster_state.toString(true); @@ -172,35 +155,35 @@ uint64_t Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg) { switch (msg.getType().getId()) { - case api::MessageType::PUT_ID: - return static_cast<const api::PutCommand&>(msg).getTimestamp(); - case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); - case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); - default: - return 0; + case api::MessageType::PUT_ID: + return static_cast<const api::PutCommand&>(msg).getTimestamp(); + case api::MessageType::REMOVE_ID: + return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); + case api::MessageType::UPDATE_ID: + return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); + default: + return 0; } } bool -Bouncer::isExternalLoad(const api::MessageType& type) const noexcept +Bouncer::isExternalLoad(const api::MessageType& type) noexcept { switch (type.getId()) { - case api::MessageType::PUT_ID: - case api::MessageType::REMOVE_ID: - case api::MessageType::UPDATE_ID: - case api::MessageType::GET_ID: - case api::MessageType::VISITOR_CREATE_ID: - case api::MessageType::STATBUCKET_ID: - return true; - default: - return false; + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::GET_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::STATBUCKET_ID: + return true; + default: + return false; } } bool -Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept { +Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept { switch (type.getId()) { case api::MessageType::PUT_ID: case api::MessageType::REMOVE_ID: @@ -216,8 +199,7 @@ Bouncer::rejectDueToInsufficientPriority( api::StorageMessage& msg, api::StorageMessage::Priority feedPriorityLowerBound) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "Operation priority (" << int(msg.getPriority()) << ") is lower than currently configured threshold (" @@ -231,8 +213,7 @@ Bouncer::rejectDueToInsufficientPriority( void Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { - std::shared_ptr<api::StorageReply> reply( - dynamic_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)", msg.getBucketId().toString().c_str(), @@ -241,31 +222,22 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { sendUp(reply); } +void +Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) { + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); + sendUp(reply); +} + bool Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - const api::MessageType& type(msg->getType()); - // All replies can come in. - if (type.isReply()) { - return false; - } - - switch (type.getId()) { - case api::MessageType::SETNODESTATE_ID: - case api::MessageType::GETNODESTATE_ID: - case api::MessageType::SETSYSTEMSTATE_ID: - case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: - case api::MessageType::NOTIFYBUCKETCHANGE_ID: - // state commands are always ok - return false; - default: - break; - } const lib::State* state; int maxClockSkewInSeconds; bool isInAvailableState; bool abortLoadWhenClusterDown; - const lib::State *cluster_state; + bool closed; + const lib::State* cluster_state; int feedPriorityLowerBound; { std::lock_guard lock(_lock); @@ -275,15 +247,34 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) cluster_state = _clusterState; isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str()); feedPriorityLowerBound = _config->feedRejectionPriorityThreshold; + closed = _closed; } - // Special case for messages storage nodes are expected to get during - // initializing. Request bucket info will be queued so storage can - // answer them at the moment they are done initializing - if (*state == lib::State::INITIALIZING && - type.getId() == api::MessageType::REQUESTBUCKETINFO_ID) - { + const api::MessageType& type = msg->getType(); + // If the node is shutting down, we want to prevent _any_ messages from reaching + // components further down the call chain. This means this case must be handled + // _before_ any logic that explicitly allows through certain message types. + if (closed) [[unlikely]] { + if (!type.isReply()) { + reject_due_to_node_shutdown(*msg); + } // else: swallow all replies + return true; + } + // All replies can come in. + if (type.isReply()) { return false; } + switch (type.getId()) { + case api::MessageType::SETNODESTATE_ID: + case api::MessageType::GETNODESTATE_ID: + case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: + case api::MessageType::NOTIFYBUCKETCHANGE_ID: + // state commands are always ok + return false; + default: + break; + } + // Special case for point lookup Gets while node is in maintenance mode // to allow reads to complete during two-phase cluster state transitions if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) { @@ -353,9 +344,9 @@ void Bouncer::handleNewState() noexcept { std::lock_guard lock(_lock); - const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState(); + const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState(); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - const auto &clusterState = *clusterStateBundle->getBaselineClusterState(); + const auto& clusterState = *clusterStateBundle->getBaselineClusterState(); _clusterState = &clusterState.getClusterState(); const lib::Node node(_component.getNodeType(), _component.getIndex()); _baselineNodeState = deriveNodeState(reportedNodeState, clusterState.getNodeState(node)); diff --git a/storage/src/vespa/storage/storageserver/bouncer.h b/storage/src/vespa/storage/storageserver/bouncer.h index 95f263d3f03..26282625269 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.h +++ b/storage/src/vespa/storage/storageserver/bouncer.h @@ -1,12 +1,7 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::Bouncer - * @ingroup storageserver - * - * @brief Denies messages from entering if state is not good. - * - * If we are not in up state, but process is still running, only a few - * messages should be allowed through. This link stops all messages not allowed. + * Component which rejects messages that can not be accepted by the node in + * its current state. */ #pragma once @@ -29,27 +24,28 @@ namespace storage { struct BouncerMetrics; class Bouncer : public StorageLink, - private StateListener, - private config::IFetcherCallback<vespa::config::content::core::StorBouncerConfig> + private StateListener { - std::unique_ptr<vespa::config::content::core::StorBouncerConfig> _config; - StorageComponent _component; - std::mutex _lock; - lib::NodeState _baselineNodeState; + using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig; using BucketSpaceNodeStateMapping = std::unordered_map<document::BucketSpace, lib::NodeState, document::BucketSpace::hash>; - BucketSpaceNodeStateMapping _derivedNodeStates; - const lib::State* _clusterState; - std::unique_ptr<config::ConfigFetcher> _configFetcher; - std::unique_ptr<BouncerMetrics> _metrics; + + std::unique_ptr<StorBouncerConfig> _config; + StorageComponent _component; + mutable std::mutex _lock; + lib::NodeState _baselineNodeState; + BucketSpaceNodeStateMapping _derivedNodeStates; + const lib::State* _clusterState; + std::unique_ptr<BouncerMetrics> _metrics; + bool _closed; public: - Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri); + Bouncer(StorageComponentRegister& compReg, const StorBouncerConfig& bootstrap_config); ~Bouncer() override; void print(std::ostream& out, bool verbose, const std::string& indent) const override; - void configure(std::unique_ptr<vespa::config::content::core::StorBouncerConfig> config) override; + void on_configure(const StorBouncerConfig& config); const BouncerMetrics& metrics() const noexcept; private: @@ -60,11 +56,12 @@ private: void abortCommandDueToClusterDown(api::StorageMessage&, const lib::State&); void rejectDueToInsufficientPriority(api::StorageMessage&, api::StorageMessage::Priority); void reject_due_to_too_few_bucket_bits(api::StorageMessage&); + void reject_due_to_node_shutdown(api::StorageMessage&); static bool clusterIsUp(const lib::State& cluster_state); bool isDistributor() const; - bool isExternalLoad(const api::MessageType&) const noexcept; - bool isExternalWriteOperation(const api::MessageType&) const noexcept; - bool priorityRejectionIsEnabled(int configuredPriority) const noexcept { + static bool isExternalLoad(const api::MessageType&) noexcept; + static bool isExternalWriteOperation(const api::MessageType&) noexcept; + static bool priorityRejectionIsEnabled(int configuredPriority) noexcept { return (configuredPriority != -1); } @@ -72,7 +69,7 @@ private: * If msg is a command containing a mutating timestamp (put, remove or * update commands), return that timestamp. Otherwise, return 0. */ - uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); + static uint64_t extractMutationTimestampIfAny(const api::StorageMessage& msg); bool onDown(const std::shared_ptr<api::StorageMessage>&) override; void handleNewState() noexcept override; const lib::NodeState &getDerivedNodeState(document::BucketSpace bucketSpace) const; diff --git a/storage/src/vespa/storage/storageserver/bouncer_metrics.cpp b/storage/src/vespa/storage/storageserver/bouncer_metrics.cpp index 5ee62bd6aee..1963cf30cd0 100644 --- a/storage/src/vespa/storage/storageserver/bouncer_metrics.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer_metrics.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bouncer_metrics.h" diff --git a/storage/src/vespa/storage/storageserver/bouncer_metrics.h b/storage/src/vespa/storage/storageserver/bouncer_metrics.h index f9647fd4a5e..92aeb4f5937 100644 --- a/storage/src/vespa/storage/storageserver/bouncer_metrics.h +++ b/storage/src/vespa/storage/storageserver/bouncer_metrics.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once @@ -15,4 +15,4 @@ struct BouncerMetrics : metrics::MetricSet { ~BouncerMetrics() override; }; -}
\ No newline at end of file +} diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 63dd6982fea..25829f3d391 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "changedbucketownershiphandler.h" #include <vespa/storageapi/message/state.h> @@ -22,12 +22,11 @@ LOG_SETUP(".bucketownershiphandler"); namespace storage { ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( - const config::ConfigUri& configUri, + const PersistenceConfig& bootstrap_config, ServiceLayerComponentRegister& compReg) : StorageLink("Changed bucket ownership handler"), _component(compReg, "changedbucketownershiphandler"), _metrics(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _state_sync_executor(1), // single thread for sequential task execution _stateLock(), _currentState(), // Not set yet, so ownership will not be valid @@ -37,25 +36,23 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _abortMutatingIdealStateOps(false), _abortMutatingExternalLoadOps(false) { - _configFetcher->subscribe<vespa::config::content::PersistenceConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerMetric(_metrics); } ChangedBucketOwnershipHandler::~ChangedBucketOwnershipHandler() = default; void -ChangedBucketOwnershipHandler::configure( - std::unique_ptr<vespa::config::content::PersistenceConfig> config) +ChangedBucketOwnershipHandler::on_configure(const vespa::config::content::PersistenceConfig& config) { _abortQueuedAndPendingOnStateChange.store( - config->abortOperationsWithChangedBucketOwnership, + config.abortOperationsWithChangedBucketOwnership, std::memory_order_relaxed); _abortMutatingIdealStateOps.store( - config->abortOutdatedMutatingIdealStateOps, + config.abortOutdatedMutatingIdealStateOps, std::memory_order_relaxed); _abortMutatingExternalLoadOps.store( - config->abortOutdatedMutatingExternalLoadOps, + config.abortOutdatedMutatingExternalLoadOps, std::memory_order_relaxed); } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index 8798d109955..801534385f7 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/document/bucket/bucketid.h> @@ -56,10 +56,7 @@ namespace lib { * - RemoveCommand * - RevertCommand */ -class ChangedBucketOwnershipHandler - : public StorageLink, - private config::IFetcherCallback<vespa::config::content::PersistenceConfig> -{ +class ChangedBucketOwnershipHandler : public StorageLink { public: class Metrics : public metrics::MetricSet { public: @@ -115,12 +112,11 @@ public: private: class ClusterStateSyncAndApplyTask; - using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>; + using PersistenceConfig = vespa::config::content::PersistenceConfig; using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>; ServiceLayerComponent _component; Metrics _metrics; - ConfigFetcherUP _configFetcher; vespalib::ThreadStackExecutor _state_sync_executor; mutable std::mutex _stateLock; ClusterStateBundleCSP _currentState; @@ -185,7 +181,7 @@ private: bool enabledExternalLoadAborting() const; public: - ChangedBucketOwnershipHandler(const config::ConfigUri& configUri, + ChangedBucketOwnershipHandler(const PersistenceConfig& bootstrap_config, ServiceLayerComponentRegister& compReg); ~ChangedBucketOwnershipHandler() override; @@ -194,7 +190,7 @@ public: bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override; void onClose() override; - void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override; + void on_configure(const PersistenceConfig&); /** * We want to ensure distribution config changes are thread safe wrt. our diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 95ed9188422..bbd4e87cb40 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" #include "rpcrequestwrapper.h" @@ -216,18 +216,21 @@ convert_to_rpc_compression_config(const vespa::config::content::core::StorCommun } -CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Communication manager"), +CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, + const config::ConfigUri& configUri, + const CommunicationManagerConfig& bootstrap_config) + : StorageLink("Communication manager", MsgDownOnFlush::Allowed, MsgUpOnClosed::Disallowed), _component(compReg, "communicationmanager"), _metrics(), _shared_rpc_resources(), // Created upon initial configuration _storage_api_rpc_service(), // (ditto) _cc_rpc_service(), // (ditto) _eventQueue(), + _bootstrap_config(std::make_unique<CommunicationManagerConfig>(bootstrap_config)), _mbus(), _configUri(configUri), _closed(false), - _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()), + _docApiConverter(std::make_shared<PlaceHolderBucketResolver>()), _thread() { _component.registerMetricUpdateHook(*this, 5s); @@ -237,9 +240,13 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co void CommunicationManager::onOpen() { - _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); - _configFetcher->subscribe<vespa::config::content::core::StorCommunicationmanagerConfig>(_configUri.getConfigId(), this); - _configFetcher->start(); + // We have to hold on to the bootstrap config until we reach the open-phase, as the + // actual RPC/mbus endpoints are started at the first config edge. + // Note: this is called as part of synchronous node initialization, which explicitly + // prevents any concurrent reconfiguration prior to opening all storage chain components, + // i.e. there's no risk of on_configure() being called _prior_ to us getting here. + on_configure(*_bootstrap_config); + _bootstrap_config.reset(); _thread = _component.startThread(*this, 60s); if (_shared_rpc_resources) { @@ -275,28 +282,25 @@ CommunicationManager::~CommunicationManager() void CommunicationManager::onClose() { - // Avoid getting config during shutdown - _configFetcher.reset(); - - _closed = true; - - if (_mbus) { - if (_messageBusSession) { - _messageBusSession->close(); - } - } - - // TODO remove? this no longer has any particularly useful semantics + _closed.store(true, std::memory_order_seq_cst); if (_cc_rpc_service) { - _cc_rpc_service->close(); + _cc_rpc_service->close(); // Auto-abort all incoming CC RPC requests from now on } - // TODO do this after we drain queues? + // Sync all RPC threads to ensure that any subsequent RPCs must observe the closed-flags we just set if (_shared_rpc_resources) { - _shared_rpc_resources->shutdown(); + _shared_rpc_resources->sync_all_threads(); + } + + if (_mbus && _messageBusSession) { + // Closing the mbus session unregisters the destination session and syncs the worker + // thread(s), so once this call returns we should not observe further incoming requests + // through this pipeline. Previous messages may already be in flight internally; these + // will be handled by flushing-phases. + _messageBusSession->close(); } - // Stopping pumper thread should stop all incoming messages from being - // processed. + // Stopping internal message dispatch thread should stop all incoming _async_ messages + // from being processed. _Synchronously_ dispatched RPCs are still passing through. if (_thread) { _thread->interrupt(); _eventQueue.signal(); @@ -305,13 +309,12 @@ CommunicationManager::onClose() } // Emptying remaining queued messages - // FIXME but RPC/mbus is already shut down at this point...! Make sure we handle this std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { assert(_eventQueue.getNext(msg, 0ms)); if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); sendReply(reply); } @@ -319,6 +322,29 @@ CommunicationManager::onClose() } void +CommunicationManager::onFlush(bool downwards) +{ + if (downwards) { + // Sync RPC threads once more (with feeling!) to ensure that any closing done by other components + // during the storage chain onClose() is visible to these. + if (_shared_rpc_resources) { + _shared_rpc_resources->sync_all_threads(); + } + // By this point, no inbound RPCs (requests and responses) should be allowed any further down + // than the Bouncer component, where they will be, well, bounced. + } else { + // All components further down the storage chain should now be completely closed + // and flushed, and all message-dispatching threads should have been shut down. + // It's possible that the RPC threads are still butting heads up against the Bouncer + // component, so we conclude the shutdown ceremony by taking down the RPC subsystem. + // This transitively waits for all RPC threads to complete. + if (_shared_rpc_resources) { + _shared_rpc_resources->shutdown(); + } + } +} + +void CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig& cfg) { const bool isDist(_component.getNodeType() == lib::NodeType::DISTRIBUTOR); @@ -330,20 +356,20 @@ CommunicationManager::configureMessageBusLimits(const CommunicationManagerConfig } void -CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> config) +CommunicationManager::on_configure(const CommunicationManagerConfig& config) { // Only allow dynamic (live) reconfiguration of message bus limits. if (_mbus) { - configureMessageBusLimits(*config); - if (_mbus->getRPCNetwork().getPort() != config->mbusport) { + configureMessageBusLimits(config); + if (_mbus->getRPCNetwork().getPort() != config.mbusport) { auto m = make_string("mbus port changed from %d to %d. Will conduct a quick, but controlled restart.", - _mbus->getRPCNetwork().getPort(), config->mbusport); + _mbus->getRPCNetwork().getPort(), config.mbusport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } - if (_shared_rpc_resources->listen_port() != config->rpcport) { + if (_shared_rpc_resources->listen_port() != config.rpcport) { auto m = make_string("rpc port changed from %d to %d. Will conduct a quick, but controlled restart.", - _shared_rpc_resources->listen_port(), config->rpcport); + _shared_rpc_resources->listen_port(), config.rpcport); LOG(warning, "%s", m.c_str()); _component.requestShutdown(m); } @@ -353,25 +379,25 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf if (!_configUri.empty()) { LOG(debug, "setting up slobrok config from id: '%s", _configUri.getConfigId().c_str()); mbus::RPCNetworkParams params(_configUri); - params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); - params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); - params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets)); - params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup)); - params.setTcpNoDelay(config->mbus.tcpNoDelay); + params.setConnectionExpireSecs(config.mbus.rpctargetcache.ttl); + params.setNumNetworkThreads(std::max(1, config.mbus.numNetworkThreads)); + params.setNumRpcTargets(std::max(1, config.mbus.numRpcTargets)); + params.events_before_wakeup(std::max(1, config.mbus.eventsBeforeWakeup)); + params.setTcpNoDelay(config.mbus.tcpNoDelay); params.required_capabilities(vespalib::net::tls::CapabilitySet::of({ vespalib::net::tls::Capability::content_document_api() })); params.setIdentity(mbus::Identity(_component.getIdentity())); - if (config->mbusport != -1) { - params.setListenPort(config->mbusport); + if (config.mbusport != -1) { + params.setListenPort(config.mbusport); } using CompressionConfig = vespalib::compression::CompressionConfig; CompressionConfig::Type compressionType = CompressionConfig::toType( - CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str()); - params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level, - 90, config->mbus.compress.limit)); + CommunicationManagerConfig::Mbus::Compress::getTypeName(config.mbus.compress.type).c_str()); + params.setCompressionConfig(CompressionConfig(compressionType, config.mbus.compress.level, + 90, config.mbus.compress.limit)); // Configure messagebus here as we for legacy reasons have // config here. @@ -381,16 +407,16 @@ CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> conf params, _configUri); - configureMessageBusLimits(*config); + configureMessageBusLimits(config); } _message_codec_provider = std::make_unique<rpc::MessageCodecProvider>(_component.getTypeRepo()->documentTypeRepo); - _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config->rpcport, - config->rpc.numNetworkThreads, config->rpc.eventsBeforeWakeup); + _shared_rpc_resources = std::make_unique<rpc::SharedRpcResources>(_configUri, config.rpcport, + config.rpc.numNetworkThreads, config.rpc.eventsBeforeWakeup); _cc_rpc_service = std::make_unique<rpc::ClusterControllerApiRpcService>(*this, *_shared_rpc_resources); rpc::StorageApiRpcService::Params rpc_params; - rpc_params.compression_config = convert_to_rpc_compression_config(*config); - rpc_params.num_rpc_targets_per_node = config->rpc.numTargetsPerNode; + rpc_params.compression_config = convert_to_rpc_compression_config(config); + rpc_params.num_rpc_targets_per_node = config.rpc.numTargetsPerNode; _storage_api_rpc_service = std::make_unique<rpc::StorageApiRpcService>( *this, *_shared_rpc_resources, *_message_codec_provider, rpc_params); @@ -438,11 +464,15 @@ CommunicationManager::process(const std::shared_ptr<api::StorageMessage>& msg) } } +// Called directly by RPC threads void CommunicationManager::dispatch_sync(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Direct dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + // If process is shutting down, msg will be synchronously aborted by the Bouncer component process(msg); } +// Called directly by RPC threads (for incoming CC requests) and by any other request-dispatching +// threads (i.e. calling sendUp) when address resolution fails and an internal error response is generated. void CommunicationManager::dispatch_async(std::shared_ptr<api::StorageMessage> msg) { LOG(spam, "Enqueued dispatch of storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); _eventQueue.enqueue(std::move(msg)); diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 156ec8bc031..7a910336b13 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -1,11 +1,6 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class CommunicationManager - * @ingroup storageserver - * - * @brief Class used for sending messages over the network. - * - * @version $Id$ + * Class used for sending messages over the network. */ #pragma once @@ -65,7 +60,6 @@ public: class CommunicationManager final : public StorageLink, public framework::Runnable, - private config::IFetcherCallback<vespa::config::content::core::StorCommunicationmanagerConfig>, public mbus::IMessageHandler, public mbus::IReplyHandler, private framework::MetricUpdateHook, @@ -80,8 +74,6 @@ private: std::unique_ptr<rpc::ClusterControllerApiRpcService> _cc_rpc_service; std::unique_ptr<rpc::MessageCodecProvider> _message_codec_provider; Queue _eventQueue; - // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? - std::unique_ptr<config::ConfigFetcher> _configFetcher; using EarlierProtocol = std::pair<vespalib::steady_time , mbus::IProtocol::SP>; using EarlierProtocols = std::vector<EarlierProtocol>; std::mutex _earlierGenerationsLock; @@ -89,6 +81,7 @@ private: void onOpen() override; void onClose() override; + void onFlush(bool downwards) override; void process(const std::shared_ptr<api::StorageMessage>& msg); @@ -96,7 +89,6 @@ private: using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); - void configure(std::unique_ptr<CommunicationManagerConfig> config) override; void receiveStorageReply(const std::shared_ptr<api::StorageReply>&); void fail_with_unresolvable_bucket_space(std::unique_ptr<documentapi::DocumentMessage> msg, const vespalib::string& error_message); @@ -105,6 +97,7 @@ private: static const uint64_t FORWARDED_MESSAGE = 0; + std::unique_ptr<CommunicationManagerConfig> _bootstrap_config; std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; @@ -126,9 +119,12 @@ public: CommunicationManager(const CommunicationManager&) = delete; CommunicationManager& operator=(const CommunicationManager&) = delete; CommunicationManager(StorageComponentRegister& compReg, - const config::ConfigUri & configUri); + const config::ConfigUri& configUri, + const CommunicationManagerConfig& bootstrap_config); ~CommunicationManager() override; + void on_configure(const CommunicationManagerConfig& config); + // MessageDispatcher overrides void dispatch_sync(std::shared_ptr<api::StorageMessage> msg) override; void dispatch_async(std::shared_ptr<api::StorageMessage> msg) override; diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp index 1783f15d67d..fe06afc6204 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanagermetrics.h" diff --git a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h index 990c19e3264..cb60d3f22ed 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h +++ b/storage/src/vespa/storage/storageserver/communicationmanagermetrics.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * \class CommunicationManagerMetrics * \ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/config_logging.cpp b/storage/src/vespa/storage/storageserver/config_logging.cpp index 1b7fa3773af..81bc8e5cc3f 100644 --- a/storage/src/vespa/storage/storageserver/config_logging.cpp +++ b/storage/src/vespa/storage/storageserver/config_logging.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "config_logging.h" #include <vespa/config/configgen/configinstance.h> diff --git a/storage/src/vespa/storage/storageserver/config_logging.h b/storage/src/vespa/storage/storageserver/config_logging.h index 8c6811cc637..8fc2fbe1386 100644 --- a/storage/src/vespa/storage/storageserver/config_logging.h +++ b/storage/src/vespa/storage/storageserver/config_logging.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp index fd01210ae9e..d839bc4d874 100644 --- a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/document/base/documentid.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> diff --git a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.h b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.h index 5088b1f15b1..8ef952c305b 100644 --- a/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.h +++ b/storage/src/vespa/storage/storageserver/configurable_bucket_resolver.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/config-bucketspaces.h> diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 431dd89b613..10ee8023621 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributornode.h" #include "bouncer.h" @@ -18,12 +18,13 @@ namespace storage { DistributorNode::DistributorNode( const config::ConfigUri& configUri, DistributorNodeContext& context, + BootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, uint32_t num_distributor_stripes, StorageLink::UP communicationManager, std::unique_ptr<IStorageChainBuilder> storage_chain_builder) - : StorageNode(configUri, context, generationFetcher, - std::make_unique<HostInfo>(), + : StorageNode(configUri, context, std::move(bootstrap_configs), + generationFetcher, std::make_unique<HostInfo>(), !communicationManager ? NORMAL : SINGLE_THREADED_TEST_MODE), _threadPool(framework::TickingThreadPool::createDefault("distributor", 100ms, 1, 5s)), _stripe_pool(std::make_unique<distributor::DistributorStripePool>()), @@ -32,7 +33,8 @@ DistributorNode::DistributorNode( _timestamp_second_counter(0), _intra_second_pseudo_usec_counter(0), _num_distributor_stripes(num_distributor_stripes), - _retrievedCommunicationManager(std::move(communicationManager)) + _retrievedCommunicationManager(std::move(communicationManager)), // may be nullptr + _bouncer(nullptr) { if (storage_chain_builder) { set_storage_chain_builder(std::move(storage_chain_builder)); @@ -82,19 +84,19 @@ void DistributorNode::createChain(IStorageChainBuilder &builder) { DistributorComponentRegister& dcr(_context.getComponentRegister()); - // TODO: All components in this chain should use a common thread instead of - // each having its own configfetcher. StorageLink::UP chain; if (_retrievedCommunicationManager) { builder.add(std::move(_retrievedCommunicationManager)); } else { - auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri); + auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri, communication_manager_config()); _communicationManager = communication_manager.get(); builder.add(std::move(communication_manager)); } std::unique_ptr<StateManager> stateManager(releaseStateManager()); - builder.add(std::make_unique<Bouncer>(dcr, _configUri)); + auto bouncer = std::make_unique<Bouncer>(dcr, bouncer_config()); + _bouncer = bouncer.get(); + builder.add(std::move(bouncer)); // Distributor instance registers a host info reporter with the state // manager, which is safe since the lifetime of said state manager // extends to the end of the process. @@ -140,4 +142,9 @@ DistributorNode::pause() return {}; } +void DistributorNode::on_bouncer_config_changed() { + assert(_bouncer); + _bouncer->on_configure(bouncer_config()); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index 5d61c86d48a..7870af95a0f 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -1,11 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::DistributorNode - * \ingroup storageserver - * - * \brief Class for setting up a distributor node. - */ - +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "distributornodecontext.h" @@ -19,6 +12,7 @@ namespace storage { namespace distributor { class DistributorStripePool; } +class Bouncer; class IStorageChainBuilder; class DistributorNode @@ -34,6 +28,7 @@ class DistributorNode uint32_t _intra_second_pseudo_usec_counter; uint32_t _num_distributor_stripes; std::unique_ptr<StorageLink> _retrievedCommunicationManager; + Bouncer* _bouncer; // If the current wall clock is more than the below number of seconds into the // past when compared to the highest recorded wall clock second time stamp, abort @@ -47,6 +42,7 @@ public: DistributorNode(const config::ConfigUri & configUri, DistributorNodeContext&, + BootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, uint32_t num_distributor_stripes, std::unique_ptr<StorageLink> communicationManager, @@ -65,6 +61,7 @@ private: void initializeNodeSpecific() override; void createChain(IStorageChainBuilder &builder) override; api::Timestamp generate_unique_timestamp() override; + void on_bouncer_config_changed() override; /** * Shut down necessary distributor-specific components before shutting diff --git a/storage/src/vespa/storage/storageserver/distributornodecontext.cpp b/storage/src/vespa/storage/storageserver/distributornodecontext.cpp index f3aca7a427d..af05d3a4eec 100644 --- a/storage/src/vespa/storage/storageserver/distributornodecontext.cpp +++ b/storage/src/vespa/storage/storageserver/distributornodecontext.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributornodecontext.h" #include <vespa/storageframework/generic/clock/clock.h> diff --git a/storage/src/vespa/storage/storageserver/distributornodecontext.h b/storage/src/vespa/storage/storageserver/distributornodecontext.h index 5691d014d1f..3d995327c1b 100644 --- a/storage/src/vespa/storage/storageserver/distributornodecontext.h +++ b/storage/src/vespa/storage/storageserver/distributornodecontext.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::DistributorNodeContext * @ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ebf9c1be142..ca46e87285b 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "documentapiconverter.h" #include "priorityconverter.h" @@ -23,9 +23,8 @@ using document::BucketSpace; namespace storage { -DocumentApiConverter::DocumentApiConverter(const config::ConfigUri &configUri, - std::shared_ptr<const BucketResolver> bucketResolver) - : _priConverter(std::make_unique<PriorityConverter>(configUri)), +DocumentApiConverter::DocumentApiConverter(std::shared_ptr<const BucketResolver> bucketResolver) + : _priConverter(std::make_unique<PriorityConverter>()), _bucketResolver(std::move(bucketResolver)) {} diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.h b/storage/src/vespa/storage/storageserver/documentapiconverter.h index 98c3bd66dbc..96b119ff44e 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.h +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/documentapi/messagebus/messages/documentmessage.h> @@ -22,18 +22,17 @@ class PriorityConverter; class DocumentApiConverter { public: - DocumentApiConverter(const config::ConfigUri &configUri, - std::shared_ptr<const BucketResolver> bucketResolver); + explicit DocumentApiConverter(std::shared_ptr<const BucketResolver> bucketResolver); ~DocumentApiConverter(); - std::unique_ptr<api::StorageCommand> toStorageAPI(documentapi::DocumentMessage& msg); - std::unique_ptr<api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); + [[nodiscard]] std::unique_ptr<api::StorageCommand> toStorageAPI(documentapi::DocumentMessage& msg); + [[nodiscard]] std::unique_ptr<api::StorageReply> toStorageAPI(documentapi::DocumentReply& reply, api::StorageCommand& originalCommand); void transferReplyState(storage::api::StorageReply& from, mbus::Reply& to); - std::unique_ptr<mbus::Message> toDocumentAPI(api::StorageCommand& cmd); + [[nodiscard]] std::unique_ptr<mbus::Message> toDocumentAPI(api::StorageCommand& cmd); const PriorityConverter& getPriorityConverter() const { return *_priConverter; } // BucketResolver getter and setter are both thread safe. - std::shared_ptr<const BucketResolver> bucketResolver() const; + [[nodiscard]] std::shared_ptr<const BucketResolver> bucketResolver() const; void setBucketResolver(std::shared_ptr<const BucketResolver> resolver); private: mutable std::mutex _mutex; diff --git a/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.cpp b/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.cpp index 5b13e6fc3e7..2e57e4bc12d 100644 --- a/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.cpp +++ b/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "fnet_metrics_wrapper.h" diff --git a/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.h b/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.h index 31b93a2839a..46e875755a4 100644 --- a/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.h +++ b/storage/src/vespa/storage/storageserver/fnet_metrics_wrapper.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 189438650ae..4cc2a7a89ab 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mergethrottler.h" #include <vespa/storageframework/generic/thread/thread.h> @@ -179,7 +179,7 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept } MergeThrottler::MergeThrottler( - const config::ConfigUri & configUri, + const StorServerConfig& bootstrap_config, StorageComponentRegister& compReg) : StorageLink("Merge Throttler"), framework::HtmlStatusReporter("merges", "Merge Throttler"), @@ -190,7 +190,6 @@ MergeThrottler::MergeThrottler( _queueSequence(0), _messageLock(), _stateLock(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _metrics(std::make_unique<Metrics>()), _component(compReg, "mergethrottler"), _thread(), @@ -203,34 +202,33 @@ MergeThrottler::MergeThrottler( { _throttlePolicy->setMinWindowSize(20); _throttlePolicy->setMaxWindowSize(20); - _configFetcher->subscribe<StorServerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerStatusPage(*this); _component.registerMetric(*_metrics); } void -MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) +MergeThrottler::on_configure(const StorServerConfig& new_config) { std::lock_guard lock(_stateLock); - _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type + _use_dynamic_throttling = (new_config.mergeThrottlingPolicy.type == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC); - if (newConfig->maxMergesPerNode < 1) { + if (new_config.maxMergesPerNode < 1) { throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } - if (newConfig->maxMergeQueueSize < 0) { + if (new_config.maxMergeQueueSize < 0) { throw config::InvalidConfigException("Max merge queue size cannot be less than 0"); } - if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { + if (new_config.resourceExhaustionMergeBackPressureDurationSecs < 0.0) { throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } if (_use_dynamic_throttling) { - auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1); - auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1); + auto min_win_sz = std::max(new_config.mergeThrottlingPolicy.minWindowSize, 1); + auto max_win_sz = std::max(new_config.mergeThrottlingPolicy.maxWindowSize, 1); if (min_win_sz > max_win_sz) { min_win_sz = max_win_sz; } - auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement); + auto win_sz_increment = std::max(1.0, new_config.mergeThrottlingPolicy.windowSizeIncrement); _throttlePolicy->setMinWindowSize(min_win_sz); _throttlePolicy->setMaxWindowSize(max_win_sz); _throttlePolicy->setWindowSizeIncrement(win_sz_increment); @@ -238,15 +236,14 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ min_win_sz, max_win_sz, win_sz_increment); } else { // Use legacy config values when static throttling is enabled. - _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode); - _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode); + _throttlePolicy->setMinWindowSize(new_config.maxMergesPerNode); + _throttlePolicy->setMaxWindowSize(new_config.maxMergesPerNode); } - LOG(debug, "Setting new max queue size to %d", - newConfig->maxMergeQueueSize); - _maxQueueSize = newConfig->maxMergeQueueSize; + LOG(debug, "Setting new max queue size to %d", new_config.maxMergeQueueSize); + _maxQueueSize = new_config.maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( - std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); - _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges; + std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); + _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; } MergeThrottler::~MergeThrottler() @@ -275,8 +272,6 @@ MergeThrottler::onOpen() void MergeThrottler::onClose() { - // Avoid getting config on shutdown - _configFetcher->close(); { std::lock_guard guard(_messageLock); // Note: used to prevent taking locks in different order if onFlush diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index dddcd42aad7..5362c2f6df8 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::MergeThrottler * @ingroup storageserver @@ -35,10 +35,11 @@ class AbortBucketOperationsCommand; class MergeThrottler : public framework::Runnable, public StorageLink, - public framework::HtmlStatusReporter, - private config::IFetcherCallback<vespa::config::content::core::StorServerConfig> + public framework::HtmlStatusReporter { public: + using StorServerConfig = vespa::config::content::core::StorServerConfig; + class MergeFailureMetrics : public metrics::MetricSet { public: metrics::SumMetric<metrics::LongCountMetric> sum; @@ -172,7 +173,6 @@ private: mutable std::mutex _messageLock; std::condition_variable _messageCond; mutable std::mutex _stateLock; - std::unique_ptr<config::ConfigFetcher> _configFetcher; // Messages pending to be processed by the worker thread std::vector<api::StorageMessage::SP> _messagesDown; std::vector<api::StorageMessage::SP> _messagesUp; @@ -190,7 +190,7 @@ public: * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); + MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&); ~MergeThrottler() override; /** Implements document::Runnable::run */ @@ -204,6 +204,8 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override; + void on_configure(const StorServerConfig& new_config); + /* * When invoked, merges to the node will be BUSY-bounced by the throttler * for a configurable period of time instead of being processed. @@ -282,11 +284,6 @@ private: [[nodiscard]] bool isChainCompleted() const noexcept; }; - /** - * Callback method for config system (IFetcherCallback) - */ - void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override; - // NOTE: unless explicitly specified, all the below functions require // _sync lock to be held upon call (usually implicitly via MessageGuard) diff --git a/storage/src/vespa/storage/storageserver/message_dispatcher.h b/storage/src/vespa/storage/storageserver/message_dispatcher.h index b6fad66b8b9..349bf4e9956 100644 --- a/storage/src/vespa/storage/storageserver/message_dispatcher.h +++ b/storage/src/vespa/storage/storageserver/message_dispatcher.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <memory> diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.cpp b/storage/src/vespa/storage/storageserver/priorityconverter.cpp index 49297f216ca..fe7570ff53a 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.cpp +++ b/storage/src/vespa/storage/storageserver/priorityconverter.cpp @@ -1,85 +1,91 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "priorityconverter.h" -#include <vespa/config/subscription/configuri.h> -#include <vespa/config/helper/configfetcher.hpp> - +#include <map> namespace storage { -PriorityConverter::PriorityConverter(const config::ConfigUri & configUri) - : _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())) +PriorityConverter::PriorityConverter() + : _mapping(), + _reverse_mapping() { - _configFetcher->subscribe<vespa::config::content::core::StorPrioritymappingConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + init_static_priority_mappings(); } PriorityConverter::~PriorityConverter() = default; -uint8_t -PriorityConverter::toStoragePriority(documentapi::Priority::Value documentApiPriority) const +void +PriorityConverter::init_static_priority_mappings() { - const uint32_t index(static_cast<uint32_t>(documentApiPriority)); - if (index >= PRI_ENUM_SIZE) { - return 255; - } + // Defaults from `stor-prioritymapping` config + constexpr uint8_t highest = 50; + constexpr uint8_t very_high = 60; + constexpr uint8_t high_1 = 70; + constexpr uint8_t high_2 = 80; + constexpr uint8_t high_3 = 90; + constexpr uint8_t normal_1 = 100; + constexpr uint8_t normal_2 = 110; + constexpr uint8_t normal_3 = 120; + constexpr uint8_t normal_4 = 130; + constexpr uint8_t normal_5 = 140; + constexpr uint8_t normal_6 = 150; + constexpr uint8_t low_1 = 160; + constexpr uint8_t low_2 = 170; + constexpr uint8_t low_3 = 180; + constexpr uint8_t very_low = 190; + constexpr uint8_t lowest = 200; - return _mapping[index]; -} + _mapping[documentapi::Priority::PRI_HIGHEST] = highest; + _mapping[documentapi::Priority::PRI_VERY_HIGH] = very_high; + _mapping[documentapi::Priority::PRI_HIGH_1] = high_1; + _mapping[documentapi::Priority::PRI_HIGH_2] = high_2; + _mapping[documentapi::Priority::PRI_HIGH_3] = high_3; + _mapping[documentapi::Priority::PRI_NORMAL_1] = normal_1; + _mapping[documentapi::Priority::PRI_NORMAL_2] = normal_2; + _mapping[documentapi::Priority::PRI_NORMAL_3] = normal_3; + _mapping[documentapi::Priority::PRI_NORMAL_4] = normal_4; + _mapping[documentapi::Priority::PRI_NORMAL_5] = normal_5; + _mapping[documentapi::Priority::PRI_NORMAL_6] = normal_6; + _mapping[documentapi::Priority::PRI_LOW_1] = low_1; + _mapping[documentapi::Priority::PRI_LOW_2] = low_2; + _mapping[documentapi::Priority::PRI_LOW_3] = low_3; + _mapping[documentapi::Priority::PRI_VERY_LOW] = very_low; + _mapping[documentapi::Priority::PRI_LOWEST] = lowest; -documentapi::Priority::Value -PriorityConverter::toDocumentPriority(uint8_t storagePriority) const -{ - std::lock_guard guard(_mutex); - std::map<uint8_t, documentapi::Priority::Value>::const_iterator iter = - _reverseMapping.lower_bound(storagePriority); + std::map<uint8_t, documentapi::Priority::Value> reverse_map_helper; + reverse_map_helper[highest] = documentapi::Priority::PRI_HIGHEST; + reverse_map_helper[very_high] = documentapi::Priority::PRI_VERY_HIGH; + reverse_map_helper[high_1] = documentapi::Priority::PRI_HIGH_1; + reverse_map_helper[high_2] = documentapi::Priority::PRI_HIGH_2; + reverse_map_helper[high_3] = documentapi::Priority::PRI_HIGH_3; + reverse_map_helper[normal_1] = documentapi::Priority::PRI_NORMAL_1; + reverse_map_helper[normal_2] = documentapi::Priority::PRI_NORMAL_2; + reverse_map_helper[normal_3] = documentapi::Priority::PRI_NORMAL_3; + reverse_map_helper[normal_4] = documentapi::Priority::PRI_NORMAL_4; + reverse_map_helper[normal_5] = documentapi::Priority::PRI_NORMAL_5; + reverse_map_helper[normal_6] = documentapi::Priority::PRI_NORMAL_6; + reverse_map_helper[low_1] = documentapi::Priority::PRI_LOW_1; + reverse_map_helper[low_2] = documentapi::Priority::PRI_LOW_2; + reverse_map_helper[low_3] = documentapi::Priority::PRI_LOW_3; + reverse_map_helper[very_low] = documentapi::Priority::PRI_VERY_LOW; + reverse_map_helper[lowest] = documentapi::Priority::PRI_LOWEST; - if (iter != _reverseMapping.end()) { - return iter->second; + // Precompute a 1-1 LUT to avoid having to lower-bound lookup values in a fixed map + _reverse_mapping.resize(256); + for (size_t i = 0; i < 256; ++i) { + auto iter = reverse_map_helper.lower_bound(static_cast<uint8_t>(i)); + _reverse_mapping[i] = (iter != reverse_map_helper.cend()) ? iter->second : documentapi::Priority::PRI_LOWEST; } - - return documentapi::Priority::PRI_LOWEST; } -void -PriorityConverter::configure(std::unique_ptr<vespa::config::content::core::StorPrioritymappingConfig> config) +uint8_t +PriorityConverter::toStoragePriority(documentapi::Priority::Value documentApiPriority) const noexcept { - // Data race free; _mapping is an array of std::atomic. - _mapping[documentapi::Priority::PRI_HIGHEST] = config->highest; - _mapping[documentapi::Priority::PRI_VERY_HIGH] = config->veryHigh; - _mapping[documentapi::Priority::PRI_HIGH_1] = config->high1; - _mapping[documentapi::Priority::PRI_HIGH_2] = config->high2; - _mapping[documentapi::Priority::PRI_HIGH_3] = config->high3; - _mapping[documentapi::Priority::PRI_NORMAL_1] = config->normal1; - _mapping[documentapi::Priority::PRI_NORMAL_2] = config->normal2; - _mapping[documentapi::Priority::PRI_NORMAL_3] = config->normal3; - _mapping[documentapi::Priority::PRI_NORMAL_4] = config->normal4; - _mapping[documentapi::Priority::PRI_NORMAL_5] = config->normal5; - _mapping[documentapi::Priority::PRI_NORMAL_6] = config->normal6; - _mapping[documentapi::Priority::PRI_LOW_1] = config->low1; - _mapping[documentapi::Priority::PRI_LOW_2] = config->low2; - _mapping[documentapi::Priority::PRI_LOW_3] = config->low3; - _mapping[documentapi::Priority::PRI_VERY_LOW] = config->veryLow; - _mapping[documentapi::Priority::PRI_LOWEST] = config->lowest; - - std::lock_guard guard(_mutex); - _reverseMapping.clear(); - _reverseMapping[config->highest] = documentapi::Priority::PRI_HIGHEST; - _reverseMapping[config->veryHigh] = documentapi::Priority::PRI_VERY_HIGH; - _reverseMapping[config->high1] = documentapi::Priority::PRI_HIGH_1; - _reverseMapping[config->high2] = documentapi::Priority::PRI_HIGH_2; - _reverseMapping[config->high3] = documentapi::Priority::PRI_HIGH_3; - _reverseMapping[config->normal1] = documentapi::Priority::PRI_NORMAL_1; - _reverseMapping[config->normal2] = documentapi::Priority::PRI_NORMAL_2; - _reverseMapping[config->normal3] = documentapi::Priority::PRI_NORMAL_3; - _reverseMapping[config->normal4] = documentapi::Priority::PRI_NORMAL_4; - _reverseMapping[config->normal5] = documentapi::Priority::PRI_NORMAL_5; - _reverseMapping[config->normal6] = documentapi::Priority::PRI_NORMAL_6; - _reverseMapping[config->low1] = documentapi::Priority::PRI_LOW_1; - _reverseMapping[config->low2] = documentapi::Priority::PRI_LOW_2; - _reverseMapping[config->low3] = documentapi::Priority::PRI_LOW_3; - _reverseMapping[config->veryLow] = documentapi::Priority::PRI_VERY_LOW; - _reverseMapping[config->lowest] = documentapi::Priority::PRI_LOWEST; + const auto index = static_cast<uint32_t>(documentApiPriority); + if (index >= PRI_ENUM_SIZE) { + return 255; + } + return _mapping[index]; } } // storage diff --git a/storage/src/vespa/storage/storageserver/priorityconverter.h b/storage/src/vespa/storage/storageserver/priorityconverter.h index 0abfccac3ea..48c7424433b 100644 --- a/storage/src/vespa/storage/storageserver/priorityconverter.h +++ b/storage/src/vespa/storage/storageserver/priorityconverter.h @@ -1,51 +1,35 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/storage/config/config-stor-prioritymapping.h> -#include <vespa/config/helper/ifetchercallback.h> #include <vespa/documentapi/messagebus/priority.h> -#include <atomic> #include <array> -#include <mutex> - -namespace config { - class ConfigUri; - class ConfigFetcher; -} +#include <vector> namespace storage { -class PriorityConverter - : public config::IFetcherCallback< - vespa::config::content::core::StorPrioritymappingConfig> -{ +class PriorityConverter { public: - using Config = vespa::config::content::core::StorPrioritymappingConfig; - - explicit PriorityConverter(const config::ConfigUri& configUri); - ~PriorityConverter() override; + PriorityConverter(); + ~PriorityConverter(); /** Converts the given priority into a storage api priority number. */ - uint8_t toStoragePriority(documentapi::Priority::Value) const; + [[nodiscard]] uint8_t toStoragePriority(documentapi::Priority::Value) const noexcept; /** Converts the given priority into a document api priority number. */ - documentapi::Priority::Value toDocumentPriority(uint8_t) const; - - void configure(std::unique_ptr<Config> config) override; + [[nodiscard]] documentapi::Priority::Value toDocumentPriority(uint8_t storage_priority) const noexcept { + return _reverse_mapping[storage_priority]; + } private: - static_assert(documentapi::Priority::PRI_ENUM_SIZE == 16, - "Unexpected size of priority enumeration"); - static_assert(documentapi::Priority::PRI_LOWEST == 15, - "Priority enum value out of bounds"); - static constexpr size_t PRI_ENUM_SIZE = documentapi::Priority::PRI_ENUM_SIZE; + void init_static_priority_mappings(); - std::array<std::atomic<uint8_t>, PRI_ENUM_SIZE> _mapping; - std::map<uint8_t, documentapi::Priority::Value> _reverseMapping; - mutable std::mutex _mutex; + static_assert(documentapi::Priority::PRI_ENUM_SIZE == 16, "Unexpected size of priority enumeration"); + static_assert(documentapi::Priority::PRI_LOWEST == 15, "Priority enum value out of bounds"); + static constexpr size_t PRI_ENUM_SIZE = documentapi::Priority::PRI_ENUM_SIZE; - std::unique_ptr<config::ConfigFetcher> _configFetcher; + std::array<uint8_t, PRI_ENUM_SIZE> _mapping; + std::vector<documentapi::Priority::Value> _reverse_mapping; }; } // storage diff --git a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt index e014f570455..b749f35b8bd 100644 --- a/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt +++ b/storage/src/vespa/storage/storageserver/rpc/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. find_package(Protobuf REQUIRED) PROTOBUF_GENERATE_CPP(storage_storageserver_rpc_PROTOBUF_SRCS storage_storageserver_rpc_PROTOBUF_HDRS diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp index cfd3d1e4bc1..71ab22b6abf 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "caching_rpc_target_resolver.h" #include "shared_rpc_resources.h" #include <vespa/fnet/frt/target.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h index f6c9eb75e12..18d7f790269 100644 --- a/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h +++ b/storage/src/vespa/storage/storageserver/rpc/caching_rpc_target_resolver.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "rpc_target.h" diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp index 0dbc9468083..4d74eb1974b 100644 --- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "cluster_controller_api_rpc_service.h" #include "shared_rpc_resources.h" #include "slime_cluster_state_bundle_codec.h" diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h index ba63928be69..c8e96707b08 100644 --- a/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_controller_api_rpc_service.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/fnet/frt/invokable.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h index 530d1bd00bd..81bd0897dbb 100644 --- a/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h +++ b/storage/src/vespa/storage/storageserver/rpc/cluster_state_bundle_codec.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h index 54419bb60cc..2443d8af2a3 100644 --- a/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h +++ b/storage/src/vespa/storage/storageserver/rpc/encoded_cluster_state_bundle.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp index 0d3536a9492..f82fa03c0f8 100644 --- a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "message_codec_provider.h" #include <vespa/storageapi/mbusprot/protocolserialization7.h> #include <mutex> diff --git a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h index ddad9e4c7aa..e02444aa0ab 100644 --- a/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h +++ b/storage/src/vespa/storage/storageserver/rpc/message_codec_provider.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <memory> diff --git a/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto index 20d3821f802..a6f26213c2a 100644 --- a/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto +++ b/storage/src/vespa/storage/storageserver/rpc/protobuf/rpc_envelope.proto @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. syntax = "proto3"; option cc_enable_arenas = true; diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_envelope_proto.h b/storage/src/vespa/storage/storageserver/rpc/rpc_envelope_proto.h index 5047eaf790d..14c93d40171 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_envelope_proto.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_envelope_proto.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h index fb6aa6c9643..502d35d206a 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/vespalib/stllike/string.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h index 080bcc909b9..a6ff0d48ef6 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_factory.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/vespalib/stllike/string.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp index 0c338a4ff5d..8566de4597a 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpc_target.h" #include "rpc_target_pool.h" diff --git a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h index 419b67d58c2..3be9598b7c8 100644 --- a/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h +++ b/storage/src/vespa/storage/storageserver/rpc/rpc_target_pool.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp index 5e4cb9d3026..eb933f5eb2c 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpc_target.h" #include "shared_rpc_resources.h" #include <vespa/fnet/frt/supervisor.h> @@ -105,6 +105,10 @@ void SharedRpcResources::wait_until_slobrok_is_ready() { } } +void SharedRpcResources::sync_all_threads() { + _transport->sync(); +} + void SharedRpcResources::shutdown() { assert(!_shutdown); if (listen_port() > 0) { diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index 953492089c1..d8f7eefad53 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "rpc_target_factory.h" @@ -42,6 +42,8 @@ public: // To be called after all RPC handlers have been registered. void start_server_and_register_slobrok(vespalib::stringref my_handle); + void sync_all_threads(); + void shutdown(); [[nodiscard]] int listen_port() const noexcept; // Only valid if server has been started diff --git a/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp index ea049493348..38d3f929549 100644 --- a/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "slime_cluster_state_bundle_codec.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> diff --git a/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h index 035197d2bd9..f0e7a5c8649 100644 --- a/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h +++ b/storage/src/vespa/storage/storageserver/rpc/slime_cluster_state_bundle_codec.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index 34d8923c6e6..417d8ad75dd 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "caching_rpc_target_resolver.h" #include "message_codec_provider.h" diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 3166abba956..49165b36314 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include "rpc_target.h" diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp index 59676c09e49..d203123f1ea 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpcrequestwrapper.h" #include <vespa/fnet/frt/rpcrequest.h> diff --git a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h index 20720bc195f..910c4478c22 100644 --- a/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h +++ b/storage/src/vespa/storage/storageserver/rpcrequestwrapper.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <cstdint> diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp index e41a947b6fb..3cd8c212dc1 100644 --- a/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "service_layer_error_listener.h" #include <vespa/storage/common/storagecomponent.h> diff --git a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h index 25bd7c6b5e1..ae90ad8f711 100644 --- a/storage/src/vespa/storage/storageserver/service_layer_error_listener.h +++ b/storage/src/vespa/storage/storageserver/service_layer_error_listener.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 846d6ed09bf..0cce2c27e95 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "servicelayernode.h" #include "bouncer.h" @@ -24,16 +24,33 @@ LOG_SETUP(".node.servicelayer"); namespace storage { -ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context, +ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs() = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs::~ServiceLayerBootstrapConfigs() = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs& +ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default; + +ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, + ServiceLayerNodeContext& context, + ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors) - : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()), + : StorageNode(configUri, context, std::move(bootstrap_configs.storage_bootstrap_configs), + generationFetcher, std::make_unique<HostInfo>()), _context(context), _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), + _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)), + _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)), + _filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)), + _bouncer(nullptr), _bucket_manager(nullptr), + _changed_bucket_ownership_handler(nullptr), _fileStorManager(nullptr), + _merge_throttler(nullptr), + _visitor_manager(nullptr), + _modified_bucket_checker(nullptr), _init_has_been_called(false) { } @@ -85,20 +102,6 @@ ServiceLayerNode::~ServiceLayerNode() } void -ServiceLayerNode::subscribeToConfigs() -{ - StorageNode::subscribeToConfigs(); - _configFetcher.reset(new config::ConfigFetcher(_configUri.getContext())); -} - -void -ServiceLayerNode::removeConfigSubscriptions() -{ - StorageNode::removeConfigSubscriptions(); - _configFetcher.reset(); -} - -void ServiceLayerNode::initializeNodeSpecific() { // Give node state to mount point initialization, such that we can @@ -106,7 +109,7 @@ ServiceLayerNode::initializeNodeSpecific() NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); - ns.setCapacity(_serverConfig->nodeCapacity); + ns.setCapacity(server_config().nodeCapacity); LOG(debug, "Adjusting reported node state to include capacity: %s", ns.toString().c_str()); _component->getStateUpdater().setReportedNodeState(ns); } @@ -117,10 +120,10 @@ ServiceLayerNode::initializeNodeSpecific() void ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard) { - if (_newServerConfig) { + if (_server_config.staging) { bool updated = false; - vespa::config::content::core::StorServerConfigBuilder oldC(*_serverConfig); - StorServerConfig& newC(*_newServerConfig); + vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active); + StorServerConfig& newC(*_server_config.staging); { updated = false; NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); @@ -132,7 +135,8 @@ ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard) ns.setCapacity(newC.nodeCapacity); } if (updated) { - _serverConfig.reset(new vespa::config::content::core::StorServerConfig(oldC)); + // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional? + _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC); _component->getStateUpdater().setReportedNodeState(ns); } } @@ -162,22 +166,31 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) { ServiceLayerComponentRegister& compReg(_context.getComponentRegister()); - auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri); + auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri, communication_manager_config()); _communicationManager = communication_manager.get(); builder.add(std::move(communication_manager)); - builder.add(std::make_unique<Bouncer>(compReg, _configUri)); - auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg); - auto merge_throttler = merge_throttler_up.get(); + auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config()); + _bouncer = bouncer.get(); + builder.add(std::move(bouncer)); + auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg); + _merge_throttler = merge_throttler_up.get(); builder.add(std::move(merge_throttler_up)); - builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg)); - auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister()); + auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg); + _changed_bucket_ownership_handler = bucket_ownership_handler.get(); + builder.add(std::move(bucket_ownership_handler)); + auto bucket_manager = std::make_unique<BucketManager>(server_config(), _context.getComponentRegister()); _bucket_manager = bucket_manager.get(); builder.add(std::move(bucket_manager)); - builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(), - static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors)); - builder.add(std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, _configUri)); + auto visitor_manager = std::make_unique<VisitorManager>(*_visitor_bootstrap_config, _context.getComponentRegister(), + static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors); + _visitor_manager = visitor_manager.get(); + builder.add(std::move(visitor_manager)); + auto bucket_checker = std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, server_config()); + _modified_bucket_checker = bucket_checker.get(); + builder.add(std::move(bucket_checker)); auto state_manager = releaseStateManager(); - auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _persistenceProvider, _context.getComponentRegister(), + auto filstor_manager = std::make_unique<FileStorManager>(*_filestor_bootstrap_config, _persistenceProvider, + _context.getComponentRegister(), getDoneInitializeHandler(), state_manager->getHostInfo()); _fileStorManager = filstor_manager.get(); builder.add(std::move(filstor_manager)); @@ -186,8 +199,43 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) // Lifetimes of all referenced components shall outlive the last call going // through the SPI, as queues are flushed and worker threads joined when // the storage link chain is closed prior to destruction. - auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); + auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *_merge_throttler); _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); + + // Purge config no longer needed + _persistence_bootstrap_config.reset(); + _visitor_bootstrap_config.reset(); + _filestor_bootstrap_config.reset(); +} + +void +ServiceLayerNode::on_configure(const StorServerConfig& config) +{ + assert(_merge_throttler); + _merge_throttler->on_configure(config); + assert(_modified_bucket_checker); + _modified_bucket_checker->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const PersistenceConfig& config) +{ + assert(_changed_bucket_ownership_handler); + _changed_bucket_ownership_handler->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const StorVisitorConfig& config) +{ + assert(_visitor_manager); + _visitor_manager->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const StorFilestorConfig& config) +{ + assert(_fileStorManager); + _fileStorManager->on_configure(config); } ResumeGuard @@ -214,4 +262,9 @@ void ServiceLayerNode::perform_post_chain_creation_init_steps() { _fileStorManager->complete_internal_initialization(); } +void ServiceLayerNode::on_bouncer_config_changed() { + assert(_bouncer); + _bouncer->on_configure(bouncer_config()); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index e308c020856..ae39bb0805e 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -1,10 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::ServiceLayerNode - * \ingroup storageserver - * - * \brief Class for setting up a service layer node. - */ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once @@ -12,16 +6,24 @@ #include "servicelayernodecontext.h" #include "storagenode.h" #include "vespa/vespalib/util/jsonstream.h" -#include <vespa/storage/visiting/visitormessagesessionfactory.h> -#include <vespa/storage/common/visitorfactory.h> +#include <vespa/config-persistence.h> +#include <vespa/config-stor-filestor.h> #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/visitorfactory.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/storage/visiting/visitormessagesessionfactory.h> namespace storage { namespace spi { struct PersistenceProvider; } +class Bouncer; class BucketManager; +class ChangedBucketOwnershipHandler; class FileStorManager; +class MergeThrottler; +class ModifiedBucketChecker; +class VisitorManager; class ServiceLayerNode : public StorageNode, @@ -29,21 +31,44 @@ class ServiceLayerNode private NodeStateReporter { - ServiceLayerNodeContext & _context; - spi::PersistenceProvider & _persistenceProvider; - VisitorFactory::Map _externalVisitors; - - // FIXME: Should probably use the fetcher in StorageNode - std::unique_ptr<config::ConfigFetcher> _configFetcher; - BucketManager * _bucket_manager; - FileStorManager * _fileStorManager; - bool _init_has_been_called; +public: + using PersistenceConfig = vespa::config::content::PersistenceConfig; + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; + using StorFilestorConfig = vespa::config::content::StorFilestorConfig; +private: + ServiceLayerNodeContext& _context; + spi::PersistenceProvider& _persistenceProvider; + VisitorFactory::Map _externalVisitors; + std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config; + std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config; + std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config; + Bouncer* _bouncer; + BucketManager* _bucket_manager; + ChangedBucketOwnershipHandler* _changed_bucket_ownership_handler; + FileStorManager* _fileStorManager; + MergeThrottler* _merge_throttler; + VisitorManager* _visitor_manager; + ModifiedBucketChecker* _modified_bucket_checker; + bool _init_has_been_called; public: using UP = std::unique_ptr<ServiceLayerNode>; + struct ServiceLayerBootstrapConfigs { + BootstrapConfigs storage_bootstrap_configs; + std::unique_ptr<PersistenceConfig> persistence_cfg; + std::unique_ptr<StorVisitorConfig> visitor_cfg; + std::unique_ptr<StorFilestorConfig> filestor_cfg; + + ServiceLayerBootstrapConfigs(); + ~ServiceLayerBootstrapConfigs(); + ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept; + ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept; + }; + ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context, + ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors); @@ -53,20 +78,24 @@ public: */ void init(); + void on_configure(const StorServerConfig& config); + void on_configure(const PersistenceConfig& config); + void on_configure(const StorVisitorConfig& config); + void on_configure(const StorFilestorConfig& config); + const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; } ResumeGuard pause() override; private: void report(vespalib::JsonStream &writer) const override; - void subscribeToConfigs() override; void initializeNodeSpecific() override; void perform_post_chain_creation_init_steps() override; void handleLiveConfigUpdate(const InitialGuard & initGuard) override; VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override; documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override; void createChain(IStorageChainBuilder &builder) override; - void removeConfigSubscriptions() override; + void on_bouncer_config_changed() override; }; } // storage diff --git a/storage/src/vespa/storage/storageserver/servicelayernodecontext.cpp b/storage/src/vespa/storage/storageserver/servicelayernodecontext.cpp index 12985d2476f..b56ee901beb 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernodecontext.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernodecontext.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "servicelayernodecontext.h" #include <vespa/storageframework/generic/clock/clock.h> diff --git a/storage/src/vespa/storage/storageserver/servicelayernodecontext.h b/storage/src/vespa/storage/storageserver/servicelayernodecontext.h index 72cf95ef120..f79720dae0b 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernodecontext.h +++ b/storage/src/vespa/storage/storageserver/servicelayernodecontext.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::ServiceLayerNodeContext * @ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 742f994cb2d..adebaa51c08 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "statemanager.h" #include "storagemetricsset.h" diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index a69675adb1b..72b89dc4d65 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::StateManager * @ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/statereporter.cpp b/storage/src/vespa/storage/storageserver/statereporter.cpp index 8548590ea0b..c0d2d4dcc59 100644 --- a/storage/src/vespa/storage/storageserver/statereporter.cpp +++ b/storage/src/vespa/storage/storageserver/statereporter.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "statereporter.h" #include <vespa/storageframework/generic/clock/clock.h> diff --git a/storage/src/vespa/storage/storageserver/statereporter.h b/storage/src/vespa/storage/storageserver/statereporter.h index 7edc6dd3aac..9601d0fc34f 100644 --- a/storage/src/vespa/storage/storageserver/statereporter.h +++ b/storage/src/vespa/storage/storageserver/statereporter.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::StateReporter diff --git a/storage/src/vespa/storage/storageserver/storagemetricsset.cpp b/storage/src/vespa/storage/storageserver/storagemetricsset.cpp index 40070ec019c..3cabf1e7fda 100644 --- a/storage/src/vespa/storage/storageserver/storagemetricsset.cpp +++ b/storage/src/vespa/storage/storageserver/storagemetricsset.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storagemetricsset.h" #include <vespa/document/fieldvalue/serializablearray.h> diff --git a/storage/src/vespa/storage/storageserver/storagemetricsset.h b/storage/src/vespa/storage/storageserver/storagemetricsset.h index b472afbceed..2330b96dc1f 100644 --- a/storage/src/vespa/storage/storageserver/storagemetricsset.h +++ b/storage/src/vespa/storage/storageserver/storagemetricsset.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 99a879e19db..f7a426a0527 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "communicationmanager.h" #include "config_logging.h" @@ -66,14 +66,19 @@ removePidFile(const vespalib::string& pidfile) } // End of anonymous namespace +StorageNode::BootstrapConfigs::BootstrapConfigs() = default; +StorageNode::BootstrapConfigs::~BootstrapConfigs() = default; +StorageNode::BootstrapConfigs::BootstrapConfigs(BootstrapConfigs&&) noexcept = default; +StorageNode::BootstrapConfigs& StorageNode::BootstrapConfigs::operator=(BootstrapConfigs&&) noexcept = default; + StorageNode::StorageNode( const config::ConfigUri & configUri, StorageNodeContext& context, + BootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, std::unique_ptr<HostInfo> hostInfo, RunMode mode) : _singleThreadedDebugMode(mode == SINGLE_THREADED_TEST_MODE), - _configFetcher(), _hostInfo(std::move(hostInfo)), _context(context), _generationFetcher(generationFetcher), @@ -90,16 +95,11 @@ StorageNode::StorageNode( _chain(), _configLock(), _initial_config_mutex(), - _serverConfig(), - _clusterConfig(), - _distributionConfig(), - _doctypesConfig(), - _bucketSpacesConfig(), - _newServerConfig(), - _newClusterConfig(), - _newDistributionConfig(), - _newDoctypesConfig(), - _newBucketSpacesConfig(), + _bouncer_config(std::move(bootstrap_configs.bouncer_cfg)), + _bucket_spaces_config(std::move(bootstrap_configs.bucket_spaces_cfg)), + _comm_mgr_config(std::move(bootstrap_configs.comm_mgr_cfg)), + _distribution_config(std::move(bootstrap_configs.distribution_cfg)), + _server_config(std::move(bootstrap_configs.server_cfg)), _component(), _node_identity(), _configUri(configUri), @@ -109,45 +109,24 @@ StorageNode::StorageNode( } void -StorageNode::subscribeToConfigs() -{ - _configFetcher = std::make_unique<config::ConfigFetcher>(_configUri.getContext()); - _configFetcher->subscribe<StorDistributionConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<UpgradingConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<StorServerConfig>(_configUri.getConfigId(), this); - _configFetcher->subscribe<BucketspacesConfig>(_configUri.getConfigId(), this); - - _configFetcher->start(); - - std::lock_guard configLockGuard(_configLock); - _serverConfig = std::move(_newServerConfig); - _clusterConfig = std::move(_newClusterConfig); - _distributionConfig = std::move(_newDistributionConfig); - _bucketSpacesConfig = std::move(_newBucketSpacesConfig); -} - -void StorageNode::initialize(const NodeStateReporter & nodeStateReporter) { // Avoid racing with concurrent reconfigurations before we've set up the entire // node component stack. + // TODO no longer needed... probably std::lock_guard<std::mutex> concurrent_config_guard(_initial_config_mutex); _context.getComponentRegister().registerShutdownListener(*this); - // Fetch configs needed first. These functions will just grab the config - // and store them away, while having the config lock. - subscribeToConfigs(); - // First update some basics that doesn't depend on anything else to be // available - _rootFolder = _serverConfig->rootFolder; + _rootFolder = server_config().rootFolder; - _context.getComponentRegister().setNodeInfo(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); + _context.getComponentRegister().setNodeInfo(server_config().clusterName, getNodeType(), server_config().nodeIndex); _context.getComponentRegister().setBucketIdFactory(document::BucketIdFactory()); - _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(*_distributionConfig)); - _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); - _node_identity = std::make_unique<NodeIdentity>(_serverConfig->clusterName, getNodeType(), _serverConfig->nodeIndex); + _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(distribution_config())); + _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config()); + _node_identity = std::make_unique<NodeIdentity>(server_config().clusterName, getNodeType(), server_config().nodeIndex); _metrics = std::make_shared<StorageMetricSet>(); _component = std::make_unique<StorageComponent>(_context.getComponentRegister(), "storagenode"); @@ -184,17 +163,17 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) // Start deadlock detector _deadLockDetector = std::make_unique<DeadLockDetector>(_context.getComponentRegister()); - _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); - _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); - _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); + _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); + _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); + _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); createChain(*_chain_builder); _chain = std::move(*_chain_builder).build(); _chain_builder.reset(); assert(_communicationManager != nullptr); - _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); + _communicationManager->updateBucketSpacesConfig(bucket_spaces_config()); perform_post_chain_creation_init_steps(); @@ -256,23 +235,23 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) // If we get here, initialize is done running. We have to handle changes // we want to handle. - if (_newServerConfig) { - StorServerConfigBuilder oldC(*_serverConfig); - StorServerConfig& newC(*_newServerConfig); + if (_server_config.staging) { + StorServerConfigBuilder oldC(*_server_config.active); + StorServerConfig& newC(*_server_config.staging); DIFFERWARN(rootFolder, "Cannot alter root folder of node live"); DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); - _serverConfig = std::make_unique<StorServerConfig>(oldC); - _newServerConfig.reset(); - _deadLockDetector->enableWarning(_serverConfig->enableDeadLockDetectorWarnings); - _deadLockDetector->enableShutdown(_serverConfig->enableDeadLockDetector); - _deadLockDetector->setProcessSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - _deadLockDetector->setWaitSlack(vespalib::from_s(_serverConfig->deadLockDetectorTimeoutSlack)); - } - if (_newDistributionConfig) { - StorDistributionConfigBuilder oldC(*_distributionConfig); - StorDistributionConfig& newC(*_newDistributionConfig); + _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode + _server_config.staging.reset(); + _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); + _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); + _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + } + if (_distribution_config.staging) { + StorDistributionConfigBuilder oldC(*_distribution_config.active); + StorDistributionConfig& newC(*_distribution_config.staging); bool updated = false; if (DIFFER(redundancy)) { LOG(info, "Live config update: Altering redundancy from %u to %u.", oldC.redundancy, newC.redundancy); @@ -303,8 +282,9 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) LOG(info, "Live config update: Group structure altered."); ASSIGN(group); } - _distributionConfig = std::make_unique<StorDistributionConfig>(oldC); - _newDistributionConfig.reset(); + // This looks weird, but the magical ASSIGN() macro mutates `oldC` in-place upon changes + _distribution_config.active = std::make_unique<StorDistributionConfig>(oldC); + _distribution_config.staging.reset(); if (updated) { _context.getComponentRegister().setDistribution(make_shared<lib::Distribution>(oldC)); for (StorageLink* link = _chain.get(); link != nullptr; link = link->getNextLink()) { @@ -312,17 +292,19 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) } } } - if (_newClusterConfig) { - if (*_clusterConfig != *_newClusterConfig) { - LOG(warning, "Live config failure: Cannot alter cluster config of node live."); - } - _newClusterConfig.reset(); - } - if (_newBucketSpacesConfig) { - _bucketSpacesConfig = std::move(_newBucketSpacesConfig); - _context.getComponentRegister().setBucketSpacesConfig(*_bucketSpacesConfig); - _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); + if (_bucket_spaces_config.staging) { + _bucket_spaces_config.promote_staging_to_active(); + _context.getComponentRegister().setBucketSpacesConfig(bucket_spaces_config()); + _communicationManager->updateBucketSpacesConfig(bucket_spaces_config()); + } + if (_comm_mgr_config.staging) { + _comm_mgr_config.promote_staging_to_active(); + _communicationManager->on_configure(communication_manager_config()); + } + if (_bouncer_config.staging) { + _bouncer_config.promote_staging_to_active(); + on_bouncer_config_changed(); } } @@ -347,25 +329,16 @@ StorageNode::notifyDoneInitializing() StorageNode::~StorageNode() = default; void -StorageNode::removeConfigSubscriptions() -{ - LOG(debug, "Removing config subscribers"); - _configFetcher.reset(); -} - -void StorageNode::shutdown() { // Try to shut down in opposite order of initialize. Bear in mind that // we might be shutting down after init exception causing only parts - // of the server to have initialize + // of the server to have been initialized LOG(debug, "Shutting down storage node of type %s", getNodeType().toString().c_str()); if (!attemptedStopped()) { LOG(debug, "Storage killed before requestShutdown() was called. No " "reason has been given for why we're stopping."); } - // Remove the subscription to avoid more callbacks from config - removeConfigSubscriptions(); if (_chain) { LOG(debug, "Closing storage chain"); @@ -433,72 +406,42 @@ StorageNode::shutdown() void StorageNode::configure(std::unique_ptr<StorServerConfig> config) { - log_config_received(*config); - // When we get config, we try to grab the config lock to ensure noone - // else is doing configuration work, and then we write the new config - // to a variable where we can find it later when processing config - // updates - { - std::lock_guard configLockGuard(_configLock); - _newServerConfig = std::move(config); - } - if (_serverConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } + stage_config_change(_server_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<UpgradingConfig> config) { - log_config_received(*config); - { - std::lock_guard configLockGuard(_configLock); - _newClusterConfig = std::move(config); - } - if (_clusterConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { + stage_config_change(_distribution_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<StorDistributionConfig> config) { - log_config_received(*config); - { - std::lock_guard configLockGuard(_configLock); - _newDistributionConfig = std::move(config); - } - if (_distributionConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { + stage_config_change(_bucket_spaces_config, std::move(config)); } + void -StorageNode::configure(std::unique_ptr<document::config::DocumenttypesConfig> config, - bool hasChanged, int64_t generation) -{ - log_config_received(*config); - (void) generation; - if (!hasChanged) - return; - { - std::lock_guard configLockGuard(_configLock); - _newDoctypesConfig = std::move(config); - } - if (_doctypesConfig) { - InitialGuard concurrent_config_guard(_initial_config_mutex); - handleLiveConfigUpdate(concurrent_config_guard); - } +StorageNode::configure(std::unique_ptr<CommunicationManagerConfig> config) { + stage_config_change(_comm_mgr_config, std::move(config)); } void -StorageNode::configure(std::unique_ptr<BucketspacesConfig> config) { - log_config_received(*config); +StorageNode::configure(std::unique_ptr<StorBouncerConfig> config) { + stage_config_change(_bouncer_config, std::move(config)); +} + +template <typename ConfigT> +void +StorageNode::stage_config_change(ConfigWrapper<ConfigT>& cfg, std::unique_ptr<ConfigT> new_cfg) { + log_config_received(*new_cfg); + // When we get config, we try to grab the config lock to ensure no one + // else is doing configuration work, and then we write the new config + // to a variable where we can find it later when processing config + // updates { - std::lock_guard configLockGuard(_configLock); - _newBucketSpacesConfig = std::move(config); + std::lock_guard config_lock_guard(_configLock); + cfg.staging = std::move(new_cfg); } - if (_bucketSpacesConfig) { + if (cfg.active) { InitialGuard concurrent_config_guard(_initial_config_mutex); handleLiveConfigUpdate(concurrent_config_guard); } @@ -564,4 +507,23 @@ StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> bui _chain_builder = std::move(builder); } +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper() noexcept = default; + +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::ConfigWrapper(std::unique_ptr<ConfigT> initial_active) noexcept + : staging(), + active(std::move(initial_active)) +{ +} + +template <typename ConfigT> +StorageNode::ConfigWrapper<ConfigT>::~ConfigWrapper() = default; + +template <typename ConfigT> +void StorageNode::ConfigWrapper<ConfigT>::promote_staging_to_active() noexcept { + assert(staging); + active = std::move(staging); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 5a521d7c66c..a96f6b52a66 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -1,9 +1,6 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StorageNode - * @ingroup storageserver - * - * @brief Main storage server class. + * Main storage server class. * * This class sets up the entire storage server. * @@ -12,13 +9,14 @@ #pragma once +#include <vespa/config-bucketspaces.h> #include <vespa/config-stor-distribution.h> -#include <vespa/config-upgrading.h> #include <vespa/config/helper/ifetchercallback.h> #include <vespa/config/subscription/configuri.h> #include <vespa/document/config/config-documenttypes.h> #include <vespa/storage/common/doneinitializehandler.h> -#include <vespa/config-bucketspaces.h> +#include <vespa/storage/config/config-stor-bouncer.h> +#include <vespa/storage/config/config-stor-communicationmanager.h> #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> @@ -51,33 +49,45 @@ struct StorageNodeContext; namespace lib { class NodeType; } -class StorageNode : private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>, - private config::IFetcherCallback<vespa::config::content::StorDistributionConfig>, - private config::IFetcherCallback<vespa::config::content::UpgradingConfig>, - private config::IFetcherCallback<vespa::config::content::core::BucketspacesConfig>, - private framework::MetricUpdateHook, +class StorageNode : private framework::MetricUpdateHook, private DoneInitializeHandler, private framework::defaultimplementation::ShutdownListener { public: + using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; + using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; + using StorBouncerConfig = vespa::config::content::core::StorBouncerConfig; + using StorDistributionConfig = vespa::config::content::StorDistributionConfig; + using StorServerConfig = vespa::config::content::core::StorServerConfig; + enum RunMode { NORMAL, SINGLE_THREADED_TEST_MODE }; StorageNode(const StorageNode &) = delete; StorageNode & operator = (const StorageNode &) = delete; - /** - * @param excludeStorageChain With this option set, no chain will be set - * up. This can be useful in unit testing if you need a storage server - * instance, but you want to have full control over the components yourself. - */ - StorageNode(const config::ConfigUri & configUri, + + struct BootstrapConfigs { + std::unique_ptr<StorBouncerConfig> bouncer_cfg; + std::unique_ptr<BucketspacesConfig> bucket_spaces_cfg; + std::unique_ptr<CommunicationManagerConfig> comm_mgr_cfg; + std::unique_ptr<StorDistributionConfig> distribution_cfg; + std::unique_ptr<StorServerConfig> server_cfg; + + BootstrapConfigs(); + ~BootstrapConfigs(); + BootstrapConfigs(BootstrapConfigs&&) noexcept; + BootstrapConfigs& operator=(BootstrapConfigs&&) noexcept; + }; + + StorageNode(const config::ConfigUri& configUri, StorageNodeContext& context, + BootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, std::unique_ptr<HostInfo> hostInfo, RunMode = NORMAL); ~StorageNode() override; virtual const lib::NodeType& getNodeType() const = 0; - bool attemptedStopped() const; + [[nodiscard]] bool attemptedStopped() const; void notifyDoneInitializing() override; void waitUntilInitialized(vespalib::duration timeout = 15s); void updateMetrics(const MetricLockGuard & guard) override; @@ -93,18 +103,17 @@ public: void requestShutdown(vespalib::stringref reason) override; DoneInitializeHandler& getDoneInitializeHandler() { return *this; } + void configure(std::unique_ptr<StorServerConfig> config); + void configure(std::unique_ptr<StorDistributionConfig> config); + void configure(std::unique_ptr<BucketspacesConfig>); + void configure(std::unique_ptr<CommunicationManagerConfig> config); + void configure(std::unique_ptr<StorBouncerConfig> config); + // For testing StorageLink* getChain() { return _chain.get(); } virtual void initializeStatusWebServer(); -protected: - using StorServerConfig = vespa::config::content::core::StorServerConfig; - using UpgradingConfig = vespa::config::content::UpgradingConfig; - using StorDistributionConfig = vespa::config::content::StorDistributionConfig; - using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; private: bool _singleThreadedDebugMode; - // Subscriptions to config - std::unique_ptr<config::ConfigFetcher> _configFetcher; std::unique_ptr<HostInfo> _hostInfo; @@ -130,32 +139,49 @@ private: // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; - /** Implementation of config callbacks. */ - void configure(std::unique_ptr<StorServerConfig> config) override; - void configure(std::unique_ptr<UpgradingConfig> config) override; - void configure(std::unique_ptr<StorDistributionConfig> config) override; - virtual void configure(std::unique_ptr<document::config::DocumenttypesConfig> config, - bool hasChanged, int64_t generation); - void configure(std::unique_ptr<BucketspacesConfig>) override; + template <typename ConfigT> + struct ConfigWrapper { + std::unique_ptr<ConfigT> staging; + std::unique_ptr<ConfigT> active; + + ConfigWrapper() noexcept; + explicit ConfigWrapper(std::unique_ptr<ConfigT> initial_active) noexcept; + ~ConfigWrapper(); + + void promote_staging_to_active() noexcept; + }; + + template <typename ConfigT> + void stage_config_change(ConfigWrapper<ConfigT>& my_cfg, std::unique_ptr<ConfigT> new_cfg); protected: // Lock taken while doing configuration of the server. std::mutex _configLock; - std::mutex _initial_config_mutex; + std::mutex _initial_config_mutex; // TODO can probably be removed using InitialGuard = std::lock_guard<std::mutex>; - // Current running config. Kept, such that we can see what has been - // changed in live config updates. - std::unique_ptr<StorServerConfig> _serverConfig; - std::unique_ptr<UpgradingConfig> _clusterConfig; - std::unique_ptr<StorDistributionConfig> _distributionConfig; - std::unique_ptr<document::config::DocumenttypesConfig> _doctypesConfig; - std::unique_ptr<BucketspacesConfig> _bucketSpacesConfig; - // New configs gotten that has yet to have been handled - std::unique_ptr<StorServerConfig> _newServerConfig; - std::unique_ptr<UpgradingConfig> _newClusterConfig; - std::unique_ptr<StorDistributionConfig> _newDistributionConfig; - std::unique_ptr<document::config::DocumenttypesConfig> _newDoctypesConfig; - std::unique_ptr<BucketspacesConfig> _newBucketSpacesConfig; + + ConfigWrapper<StorBouncerConfig> _bouncer_config; + ConfigWrapper<BucketspacesConfig> _bucket_spaces_config; + ConfigWrapper<CommunicationManagerConfig> _comm_mgr_config; + ConfigWrapper<StorDistributionConfig> _distribution_config; + ConfigWrapper<StorServerConfig> _server_config; + + [[nodiscard]] const StorBouncerConfig& bouncer_config() const noexcept { + return *_bouncer_config.active; + } + [[nodiscard]] const BucketspacesConfig& bucket_spaces_config() const noexcept { + return *_bucket_spaces_config.active; + } + [[nodiscard]] const CommunicationManagerConfig& communication_manager_config() const noexcept { + return *_comm_mgr_config.active; + } + [[nodiscard]] const StorDistributionConfig& distribution_config() const noexcept { + return *_distribution_config.active; + } + [[nodiscard]] const StorServerConfig& server_config() const noexcept { + return *_server_config.active; + } + std::unique_ptr<StorageComponent> _component; std::unique_ptr<NodeIdentity> _node_identity; config::ConfigUri _configUri; @@ -174,13 +200,13 @@ protected: std::unique_ptr<StateManager> releaseStateManager(); void initialize(const NodeStateReporter & reporter); - virtual void subscribeToConfigs(); virtual void initializeNodeSpecific() = 0; virtual void perform_post_chain_creation_init_steps() = 0; virtual void createChain(IStorageChainBuilder &builder) = 0; virtual void handleLiveConfigUpdate(const InitialGuard & initGuard); void shutdown(); - virtual void removeConfigSubscriptions(); + + virtual void on_bouncer_config_changed() { /* no-op by default */ } public: void set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder); }; diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp index ae7948a2916..c8fccf2c274 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "storagenodecontext.h" #include <vespa/storageframework/generic/clock/clock.h> diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h index 52709fb1d9b..ce6a39dca16 100644 --- a/storage/src/vespa/storage/storageserver/storagenodecontext.h +++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** * @class storage::StorageNodeContext * @ingroup storageserver diff --git a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp index ad74e020a82..a1e63f02ac7 100644 --- a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp +++ b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "tls_statistics_metrics_wrapper.h" diff --git a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h index daf02b53b7a..8aa28e959df 100644 --- a/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h +++ b/storage/src/vespa/storage/storageserver/tls_statistics_metrics_wrapper.h @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once #include <vespa/metrics/metricset.h> |