aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/storageserver/bouncer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa/storage/storageserver/bouncer.cpp')
-rw-r--r--storage/src/vespa/storage/storageserver/bouncer.cpp161
1 files changed, 76 insertions, 85 deletions
diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp
index cfe283edb9b..bfc38e0c8ba 100644
--- a/storage/src/vespa/storage/storageserver/bouncer.cpp
+++ b/storage/src/vespa/storage/storageserver/bouncer.cpp
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bouncer.h"
#include "bouncer_metrics.h"
@@ -21,34 +21,19 @@ LOG_SETUP(".bouncer");
namespace storage {
-Bouncer::Bouncer(StorageComponentRegister& compReg, const config::ConfigUri & configUri)
- : StorageLink("Bouncer"),
- _config(new vespa::config::content::core::StorBouncerConfig()),
+Bouncer::Bouncer(StorageComponentRegister& compReg, const StorBouncerConfig& bootstrap_config)
+ : StorageLink("Bouncer", MsgDownOnFlush::Disallowed, MsgUpOnClosed::Allowed),
+ _config(std::make_unique<vespa::config::content::core::StorBouncerConfig>(bootstrap_config)),
_component(compReg, "bouncer"),
_lock(),
_baselineNodeState("s:i"),
_derivedNodeStates(),
_clusterState(&lib::State::UP),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
- _metrics(std::make_unique<BouncerMetrics>())
+ _metrics(std::make_unique<BouncerMetrics>()),
+ _closed(false)
{
_component.getStateUpdater().addStateListener(*this);
_component.registerMetric(*_metrics);
- // Register for config. Normally not critical, so catching config
- // exception allowing program to continue if missing/faulty config.
- try{
- if (!configUri.empty()) {
- _configFetcher->subscribe<vespa::config::content::core::StorBouncerConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
- } else {
- LOG(info, "No config id specified. Using defaults rather than "
- "config");
- }
- } catch (config::InvalidConfigException& e) {
- LOG(info, "Bouncer failed to load config '%s'. This "
- "is not critical since it has sensible defaults: %s",
- configUri.getConfigId().c_str(), e.what());
- }
}
Bouncer::~Bouncer()
@@ -62,23 +47,25 @@ Bouncer::print(std::ostream& out, bool verbose,
const std::string& indent) const
{
(void) verbose; (void) indent;
+ std::lock_guard guard(_lock);
out << "Bouncer(" << _baselineNodeState << ")";
}
void
Bouncer::onClose()
{
- _configFetcher->close();
_component.getStateUpdater().removeStateListener(*this);
+ std::lock_guard guard(_lock);
+ _closed = true;
}
void
-Bouncer::configure(std::unique_ptr<vespa::config::content::core::StorBouncerConfig> config)
+Bouncer::on_configure(const vespa::config::content::core::StorBouncerConfig& config)
{
- log_config_received(*config);
- validateConfig(*config);
+ validateConfig(config);
+ auto new_config = std::make_unique<StorBouncerConfig>(config);
std::lock_guard lock(_lock);
- _config = std::move(config);
+ _config = std::move(new_config);
}
const BouncerMetrics& Bouncer::metrics() const noexcept {
@@ -86,8 +73,7 @@ const BouncerMetrics& Bouncer::metrics() const noexcept {
}
void
-Bouncer::validateConfig(
- const vespa::config::content::core::StorBouncerConfig& newConfig) const
+Bouncer::validateConfig(const vespa::config::content::core::StorBouncerConfig& newConfig) const
{
if (newConfig.feedRejectionPriorityThreshold != -1) {
if (newConfig.feedRejectionPriorityThreshold
@@ -112,12 +98,11 @@ void Bouncer::append_node_identity(std::ostream& target_stream) const {
}
void
-Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg,
- const lib::State& state)
+Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg, const lib::State& state)
{
// If we're not up or retired, fail due to this nodes state.
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "We don't allow command of type " << msg.getType()
<< " when node is in state " << state.toString(true);
@@ -128,8 +113,7 @@ Bouncer::abortCommandForUnavailableNode(api::StorageMessage& msg,
}
void
-Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
- int maxClockSkewInSeconds)
+Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg, int maxClockSkewInSeconds)
{
auto& as_cmd = dynamic_cast<api::StorageCommand&>(msg);
std::ostringstream ost;
@@ -140,7 +124,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
as_cmd.getSourceIndex(), ost.str().c_str());
_metrics->clock_skew_aborts.inc();
- std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(as_cmd.makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
sendUp(reply);
}
@@ -148,8 +132,7 @@ Bouncer::rejectCommandWithTooHighClockSkew(api::StorageMessage& msg,
void
Bouncer::abortCommandDueToClusterDown(api::StorageMessage& msg, const lib::State& cluster_state)
{
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "We don't allow external load while cluster is in state "
<< cluster_state.toString(true);
@@ -172,35 +155,35 @@ uint64_t
Bouncer::extractMutationTimestampIfAny(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
- case api::MessageType::PUT_ID:
- return static_cast<const api::PutCommand&>(msg).getTimestamp();
- case api::MessageType::REMOVE_ID:
- return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
- case api::MessageType::UPDATE_ID:
- return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
- default:
- return 0;
+ case api::MessageType::PUT_ID:
+ return static_cast<const api::PutCommand&>(msg).getTimestamp();
+ case api::MessageType::REMOVE_ID:
+ return static_cast<const api::RemoveCommand&>(msg).getTimestamp();
+ case api::MessageType::UPDATE_ID:
+ return static_cast<const api::UpdateCommand&>(msg).getTimestamp();
+ default:
+ return 0;
}
}
bool
-Bouncer::isExternalLoad(const api::MessageType& type) const noexcept
+Bouncer::isExternalLoad(const api::MessageType& type) noexcept
{
switch (type.getId()) {
- case api::MessageType::PUT_ID:
- case api::MessageType::REMOVE_ID:
- case api::MessageType::UPDATE_ID:
- case api::MessageType::GET_ID:
- case api::MessageType::VISITOR_CREATE_ID:
- case api::MessageType::STATBUCKET_ID:
- return true;
- default:
- return false;
+ case api::MessageType::PUT_ID:
+ case api::MessageType::REMOVE_ID:
+ case api::MessageType::UPDATE_ID:
+ case api::MessageType::GET_ID:
+ case api::MessageType::VISITOR_CREATE_ID:
+ case api::MessageType::STATBUCKET_ID:
+ return true;
+ default:
+ return false;
}
}
bool
-Bouncer::isExternalWriteOperation(const api::MessageType& type) const noexcept {
+Bouncer::isExternalWriteOperation(const api::MessageType& type) noexcept {
switch (type.getId()) {
case api::MessageType::PUT_ID:
case api::MessageType::REMOVE_ID:
@@ -216,8 +199,7 @@ Bouncer::rejectDueToInsufficientPriority(
api::StorageMessage& msg,
api::StorageMessage::Priority feedPriorityLowerBound)
{
- std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(msg).makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(msg).makeReply());
std::ostringstream ost;
ost << "Operation priority (" << int(msg.getPriority())
<< ") is lower than currently configured threshold ("
@@ -231,8 +213,7 @@ Bouncer::rejectDueToInsufficientPriority(
void
Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
- std::shared_ptr<api::StorageReply> reply(
- dynamic_cast<api::StorageCommand&>(msg).makeReply());
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
vespalib::make_string("Operation bucket %s has too few bits used (%u < minimum of %u)",
msg.getBucketId().toString().c_str(),
@@ -241,31 +222,22 @@ Bouncer::reject_due_to_too_few_bucket_bits(api::StorageMessage& msg) {
sendUp(reply);
}
+void
+Bouncer::reject_due_to_node_shutdown(api::StorageMessage& msg) {
+ std::shared_ptr<api::StorageReply> reply(dynamic_cast<api::StorageCommand&>(msg).makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Node is shutting down"));
+ sendUp(reply);
+}
+
bool
Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
- const api::MessageType& type(msg->getType());
- // All replies can come in.
- if (type.isReply()) {
- return false;
- }
-
- switch (type.getId()) {
- case api::MessageType::SETNODESTATE_ID:
- case api::MessageType::GETNODESTATE_ID:
- case api::MessageType::SETSYSTEMSTATE_ID:
- case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
- case api::MessageType::NOTIFYBUCKETCHANGE_ID:
- // state commands are always ok
- return false;
- default:
- break;
- }
const lib::State* state;
int maxClockSkewInSeconds;
bool isInAvailableState;
bool abortLoadWhenClusterDown;
- const lib::State *cluster_state;
+ bool closed;
+ const lib::State* cluster_state;
int feedPriorityLowerBound;
{
std::lock_guard lock(_lock);
@@ -275,15 +247,34 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg)
cluster_state = _clusterState;
isInAvailableState = state->oneOf(_config->stopAllLoadWhenNodestateNotIn.c_str());
feedPriorityLowerBound = _config->feedRejectionPriorityThreshold;
+ closed = _closed;
}
- // Special case for messages storage nodes are expected to get during
- // initializing. Request bucket info will be queued so storage can
- // answer them at the moment they are done initializing
- if (*state == lib::State::INITIALIZING &&
- type.getId() == api::MessageType::REQUESTBUCKETINFO_ID)
- {
+ const api::MessageType& type = msg->getType();
+ // If the node is shutting down, we want to prevent _any_ messages from reaching
+ // components further down the call chain. This means this case must be handled
+ // _before_ any logic that explicitly allows through certain message types.
+ if (closed) [[unlikely]] {
+ if (!type.isReply()) {
+ reject_due_to_node_shutdown(*msg);
+ } // else: swallow all replies
+ return true;
+ }
+ // All replies can come in.
+ if (type.isReply()) {
return false;
}
+ switch (type.getId()) {
+ case api::MessageType::SETNODESTATE_ID:
+ case api::MessageType::GETNODESTATE_ID:
+ case api::MessageType::SETSYSTEMSTATE_ID:
+ case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID:
+ case api::MessageType::NOTIFYBUCKETCHANGE_ID:
+ // state commands are always ok
+ return false;
+ default:
+ break;
+ }
+
// Special case for point lookup Gets while node is in maintenance mode
// to allow reads to complete during two-phase cluster state transitions
if ((*state == lib::State::MAINTENANCE) && (type.getId() == api::MessageType::GET_ID) && clusterIsUp(*cluster_state)) {
@@ -353,9 +344,9 @@ void
Bouncer::handleNewState() noexcept
{
std::lock_guard lock(_lock);
- const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState();
+ const auto reportedNodeState = *_component.getStateUpdater().getReportedNodeState();
const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
- const auto &clusterState = *clusterStateBundle->getBaselineClusterState();
+ const auto& clusterState = *clusterStateBundle->getBaselineClusterState();
_clusterState = &clusterState.getClusterState();
const lib::Node node(_component.getNodeType(), _component.getIndex());
_baselineNodeState = deriveNodeState(reportedNodeState, clusterState.getNodeState(node));