diff options
Diffstat (limited to 'storage/src/vespa/storage/storageserver/bouncer.cpp')
-rw-r--r-- | storage/src/vespa/storage/storageserver/bouncer.cpp | 161 |
1 files changed, 76 insertions, 85 deletions
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index cfe283edb9b..bfc38e0c8ba 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bouncer.h" #include "bouncer_metrics.h" @@ -21,34 +21,19 @@ LOG_SETUP(".bouncer"); namespace storage { -Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri) - : StorageLink("Bouncer"), - _config(new vespa::config::content::core::StorBouncerConfig()), +Bouncer::Bouncer(StorageComponentRegister& compReg, const StorBouncerConfig& bootstrap_config) + : StorageLink("Bouncer", MsgDownOnFlush::Disallowed, MsgUpOnClosed::Allowed), + _config(std::make_unique<vespa::config::content::core::StorBouncerConfig>(bootstrap_config)), _component(compReg, "bouncer"), _lock(), _baselineNodeState("s:i"), _derivedNodeStates(), _clusterState(&lib::State::UP), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), - _metrics(std::make_unique<BouncerMetrics>()) + _metrics(std::make_unique<BouncerMetrics>()), + _closed(false) { _component.getStateUpdater().addStateListener(*this); _component.registerMetric(*_metrics); - // Register for config. Normally not critical, so catching config - // exception allowing program to continue if missing/faulty config. - try{ - if (!configUri.empty()) { - _configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); - } else { - LOG(info, "No config id specified. Using defaults rather than " - "config"); - } - } catch (config::InvalidConfigException& e) { - LOG(info, "Bouncer failed to load config '%s'. This " - "is not critical since it has sensible defaults: %s", - configUri.getConfigId().c_str(), e.what()); - } } Bouncer::~Bouncer() @@ -62,23 +47,25 @@ Bouncer::print(std::ostream& out, bool verbose, const std::string& indent) const { (void) verbose; (void) indent; + std::lock_guard guard(_lock); out << "Bouncer(" << _baselineNodeState << ")"; } void Bouncer::onClose() { - _configFetcher->close(); _component.getStateUpdater().removeStateListener(*this); + std::lock_guard guard(_lock); + _closed = true; } void -Bouncer::configure(std::unique_ptr<vespa::config::content::core::StorBouncerConfig> config) +Bouncer::on_configure(const vespa::config::content::core::StorBouncerConfig& config) { - log_config_received(*config); - validateConfig(*config); + validateConfig(config); + auto new_config = std::make_unique<StorBouncerConfig>(config); std::lock_guard lock(_lock); - _config = std::move(config); + _config = std::move(new_config); } const BouncerMetrics& Bouncer::metrics() const noexcept { @@ -86,8 +73,7 @@ const BouncerMetrics& Bouncer::metrics() const noexcept { } void -Bouncer::validateConfig( - const vespa::config::content::core::StorBouncerConfig& newConfig) const +Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const { if (newConfig.feedRejectionPriorityThreshold != -1) { if (newConfig.feedRejectionPriorityThreshold @@ -112,12 +98,11 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const { } void -Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, - const lib::State& state) +Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state) { // If we're not up or retired, fail due to this nodes state. std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow command of type " << msg.getType() << " when node is in state " << state.toString(true); @@ -128,8 +113,7 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, } void -Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, - int maxClockSkewInSeconds) +Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds) { auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg); std::ostringstream ost; @@ -140,7 +124,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, as_cmd.getSourceIndex(), ost.str().c_str()); _metrics->clock_skew_aborts.inc(); - std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release()); + std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); sendUp(reply); } @@ -148,8 +132,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, void Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "We don't allow external load while cluster is in state " << cluster_state.toString(true); @@ -172,35 +155,35 @@ uint64_t Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg) { switch (msg.getType().getId()) { - case api::MessageType::PUT_ID: - return static_cast<const api::PutCommand&>(msg).getTimestamp(); - case api::MessageType::REMOVE_ID: - return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); - case api::MessageType::UPDATE_ID: - return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); - default: - return 0; + case api::MessageType::PUT_ID: + return static_cast<const api::PutCommand&>(msg).getTimestamp(); + case api::MessageType::REMOVE_ID: + return static_cast<const api::RemoveCommand&>(msg).getTimestamp(); + case api::MessageType::UPDATE_ID: + return static_cast<const api::UpdateCommand&>(msg).getTimestamp(); + default: + return 0; } } bool -Bouncer::isExternalLoad(const api::MessageType& type) const noexcept +Bouncer::isExternalLoad(const api::MessageType& type) noexcept { switch (type.getId()) { - case api::MessageType::PUT_ID: - case api::MessageType::REMOVE_ID: - case api::MessageType::UPDATE_ID: - case api::MessageType::GET_ID: - case api::MessageType::VISITOR_CREATE_ID: - case api::MessageType::STATBUCKET_ID: - return true; - default: - return false; + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::GET_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::STATBUCKET_ID: + return true; + default: + return false; } } bool -Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept { +Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept { switch (type.getId()) { case api::MessageType::PUT_ID: case api::MessageType::REMOVE_ID: @@ -216,8 +199,7 @@ Bouncer::rejectDueToInsufficientPriority( api::StorageMessage& msg, api::StorageMessage::Priority feedPriorityLowerBound) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply()); std::ostringstream ost; ost << "Operation priority (" << int(msg.getPriority()) << ") is lower than currently configured threshold (" @@ -231,8 +213,7 @@ Bouncer::rejectDueToInsufficientPriority( void Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { - std::shared_ptr<api::StorageReply> reply( - dynamic_cast<api::StorageCommand&>(msg).makeReply()); + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)", msg.getBucketId().toString().c_str(), @@ -241,31 +222,22 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) { sendUp(reply); } +void +Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) { + std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down")); + sendUp(reply); +} + bool Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - const api::MessageType& type(msg->getType()); - // All replies can come in. - if (type.isReply()) { - return false; - } - - switch (type.getId()) { - case api::MessageType::SETNODESTATE_ID: - case api::MessageType::GETNODESTATE_ID: - case api::MessageType::SETSYSTEMSTATE_ID: - case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: - case api::MessageType::NOTIFYBUCKETCHANGE_ID: - // state commands are always ok - return false; - default: - break; - } const lib::State* state; int maxClockSkewInSeconds; bool isInAvailableState; bool abortLoadWhenClusterDown; - const lib::State *cluster_state; + bool closed; + const lib::State* cluster_state; int feedPriorityLowerBound; { std::lock_guard lock(_lock); @@ -275,15 +247,34 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) cluster_state = _clusterState; isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str()); feedPriorityLowerBound = _config->feedRejectionPriorityThreshold; + closed = _closed; } - // Special case for messages storage nodes are expected to get during - // initializing. Request bucket info will be queued so storage can - // answer them at the moment they are done initializing - if (*state == lib::State::INITIALIZING && - type.getId() == api::MessageType::REQUESTBUCKETINFO_ID) - { + const api::MessageType& type = msg->getType(); + // If the node is shutting down, we want to prevent _any_ messages from reaching + // components further down the call chain. This means this case must be handled + // _before_ any logic that explicitly allows through certain message types. + if (closed) [[unlikely]] { + if (!type.isReply()) { + reject_due_to_node_shutdown(*msg); + } // else: swallow all replies + return true; + } + // All replies can come in. + if (type.isReply()) { return false; } + switch (type.getId()) { + case api::MessageType::SETNODESTATE_ID: + case api::MessageType::GETNODESTATE_ID: + case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: + case api::MessageType::NOTIFYBUCKETCHANGE_ID: + // state commands are always ok + return false; + default: + break; + } + // Special case for point lookup Gets while node is in maintenance mode // to allow reads to complete during two-phase cluster state transitions if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) { @@ -353,9 +344,9 @@ void Bouncer::handleNewState() noexcept { std::lock_guard lock(_lock); - const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState(); + const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState(); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - const auto &clusterState = *clusterStateBundle->getBaselineClusterState(); + const auto& clusterState = *clusterStateBundle->getBaselineClusterState(); _clusterState = &clusterState.getClusterState(); const lib::Node node(_component.getNodeType(), _component.getIndex()); _baselineNodeState = deriveNodeState(reportedNodeState, clusterState.getNodeState(node)); |