aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-06-03 12:36:53 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-06-03 12:58:34 +0000
commit356063939c2a78d32437cbf998db58d996c70ba4 (patch)
tree2cac0ae179ba02fd86850720b34165b57cc7076c
parentd2ba6cf3ba59d0f25c4111f2cc670eb5de7d126f (diff)
Support enforcing strictly increasing state versions across cluster controllers
Adds a (live) config that specifies if content nodes and distributors shall reject cluster state versions that are lower than the one that is currently active on the node. This prevents "last write wins" ordering problems when multiple cluster controllers have partially overlapping leadership periods. In the name of pragmatism, we try to auto-detect the case where ZooKeeper state must have been lost on the cluster controller cluster, and accept the state even with lower version number. Otherwise, the content cluster would be effectively stalled until all its processes had been manually restarted. Adds wiring of live config to the `StateManager` component on both content and distributor nodes.
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp126
-rw-r--r--storage/src/vespa/storage/config/stor-server.def8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp40
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp68
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h23
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h4
-rw-r--r--storage/src/vespa/storageapi/message/state.h27
9 files changed, 234 insertions, 85 deletions
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index b785bc141b6..79246cb3ce1 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -32,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter {
void SetUp() override;
void TearDown() override;
- void force_current_cluster_state_version(uint32_t version);
+ static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ cmd->setSourceIndex(cc_index);
+ return cmd;
+ }
+
+ void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void force_current_cluster_state_version(uint32_t version, uint16_t cc_index);
+ void force_current_cluster_state_version(uint32_t version) {
+ force_current_cluster_state_version(version, 0);
+ }
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -86,11 +97,30 @@ StateManagerTest::TearDown() {
}
void
-StateManagerTest::force_current_cluster_state_version(uint32_t version)
+StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_EQ(_upper->getNumReplies(), 1);
+ ASSERT_TRUE(_upper->getReply(0)->getType().isReply());
+ reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0));
+ ASSERT_TRUE(reply_out.get() != nullptr);
+ _upper->reset();
+}
+
+void
+StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out));
+ ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK));
+}
+
+void
+StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index)
{
ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
state.setVersion(version);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle(
+ std::make_shared<const lib::ClusterStateBundle>(state), cc_index);
+ ASSERT_EQ(maybe_rejected_by_ver, std::nullopt);
}
void
@@ -118,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version
version_out = clusterStateVersionCursor.asLong();
}
-#define GET_ONLY_OK_REPLY(varname) \
-{ \
- ASSERT_EQ(size_t(1), _upper->getNumReplies()); \
- ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \
- varname = std::dynamic_pointer_cast<api::StorageReply>( \
- _upper->getReply(0)); \
- ASSERT_TRUE(varname.get() != nullptr); \
- _upper->reset(); \
- ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \
- varname->getResult()); \
-}
-
TEST_F(StateManagerTest, cluster_state) {
std::shared_ptr<api::StorageReply> reply;
// Verify initial state on startup
auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ("cluster:d", currentState->toString(false));
+ EXPECT_EQ(currentState->getVersion(), 0);
auto currentNodeState = _manager->getCurrentNodeState();
EXPECT_EQ("s:d", currentNodeState->toString(false));
@@ -142,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) {
ClusterState sendState("storage:4 .2.s:m");
auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState);
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ(sendState, *currentState);
@@ -151,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) {
EXPECT_EQ("s:m", currentNodeState->toString(false));
}
+TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(false);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122);
+}
+
+TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply));
+ api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has "
+ "a higher cluster state version (123)");
+ EXPECT_EQ(reply->getResult(), expected_res);
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+}
+
+// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper
+// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version
+// to avoid stalling the entire cluster for an indeterminate amount of time.
+TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124);
+
+ // CC 1 restarts from scratch with previous ZK state up in smoke.
+ _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1));
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3);
+
+ // CC 2 restarts and continues from where CC 1 left off.
+ _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2));
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4);
+}
+
namespace {
struct MyStateListener : public StateListener {
const NodeStateUpdater& updater;
lib::NodeState current;
std::ostringstream ost;
- MyStateListener(const NodeStateUpdater& upd);
+ explicit MyStateListener(const NodeStateUpdater& upd);
~MyStateListener() override;
void handleNewState() noexcept override {
@@ -194,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) {
// And get node state command (no expected state)
auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP());
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_shared<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -203,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) {
cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING));
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -222,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) {
_manager->setReportedNodeState(ns);
}
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -244,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) {
}
TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) {
- force_current_cluster_state_version(123);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 123);
@@ -266,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde
void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() {
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
}
@@ -344,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply)
}
TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) {
- force_current_cluster_state_version(12345);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345));
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
cmd->setTimeout(10000000ms);
@@ -353,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
EXPECT_EQ(12340, activate_reply.activateVersion());
@@ -366,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -374,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
}
TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) {
- force_current_cluster_state_version(100);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100));
lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true);
auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle);
@@ -382,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
state_cmd->setSourceIndex(0);
_upper->sendDown(state_cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -392,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
activation_cmd->setTimeout(1000s);
activation_cmd->setSourceIndex(0);
_upper->sendDown(activation_cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 101);
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 44e4b14eafc..49ce0b678d0 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -118,3 +118,11 @@ content_node_bucket_db_stripe_bits int default=4 restart
## Iff set, a special `pidfile` file is written under the node's root directory upon
## startup containing the PID of the running process.
write_pid_file_on_startup bool default=true
+
+## Iff true, received cluster state versions that are lower than the current active
+## (or pending to be active) version on the node will be explicitly rejected. This
+## prevents race conditions caused by multiple cluster controllers believing they
+## are the leader during overlapping time intervals, as only the most recent leader
+## is able to increment the current state version in ZooKeeper, but the old controller
+## may still attempt to publish its old state.
+require_strictly_increasing_cluster_state_versions bool default=false
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index bbd4e87cb40..1b119a0e631 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,7 +622,14 @@ CommunicationManager::sendDirectRPCReply(RPCRequestWrapper& request,
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
- // No data to return
+ // No data to return, but the request must be failed iff we rejected the state version
+ // due to a higher version having been previously received.
+ auto& state_reply = dynamic_cast<api::SetSystemStateReply&>(*reply);
+ if (state_reply.getResult().getResult() == api::ReturnCode::REJECTED) {
+ vespalib::string err_msg = state_reply.getResult().getMessage(); // ReturnCode message is stringref
+ request.returnError(FRTE_RPC_METHOD_FAILED, err_msg.c_str());
+ return;
+ }
} else if (requestName == "activate_cluster_state_version") {
auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
request.addReturnInt(activate_reply.actualVersion());
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index d18935afe24..98796ee6440 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -125,24 +125,28 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
- if (_server_config.staging) {
- bool updated = false;
- vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
- StorServerConfig& newC(*_server_config.staging);
- {
- updated = false;
- NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
- if (DIFFER(nodeCapacity)) {
- LOG(info, "Live config update: Updating node capacity from %f to %f.",
- oldC.nodeCapacity, newC.nodeCapacity);
- ASSIGN(nodeCapacity);
- ns.setCapacity(newC.nodeCapacity);
- }
- if (updated) {
- // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional?
- _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
- _component->getStateUpdater().setReportedNodeState(ns);
+ {
+ std::lock_guard config_lock(_configLock);
+ // Live server config patching happens both here and in StorageNode::handleLiveConfigUpdate,
+ // which we have to delegate to afterward (_without_ holding _configLock at the time).
+ if (_server_config.staging) {
+ bool updated = false;
+ vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
+ StorServerConfig& newC(*_server_config.staging);
+ {
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
+ if (DIFFER(nodeCapacity)) {
+ LOG(info, "Live config update: Updating node capacity from %f to %f.",
+ oldC.nodeCapacity, newC.nodeCapacity);
+ ASSIGN(nodeCapacity);
+ ns.setCapacity(newC.nodeCapacity);
+ }
+ if (updated) {
+ // FIXME the patching of old config vs new config is confusing and error-prone. Redesign!
+ _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
+ _component->getStateUpdater().setReportedNodeState(ns);
+ }
}
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index adebaa51c08..a2106dce8d2 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -21,7 +21,7 @@
#include <fstream>
#include <ranges>
-#include <vespa/log/log.h>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".state.manager");
using vespalib::make_string_short::fmt;
@@ -74,10 +74,13 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_health_ping_time(),
_health_ping_warn_interval(5min),
_health_ping_warn_time(_start_time + _health_ping_warn_interval),
+ _last_accepted_cluster_state_time(),
+ _last_observed_version_from_cc(),
_hostInfo(std::move(hostInfo)),
_controllers_observed_explicit_node_state(),
_noThreadTestMode(testMode),
_grabbedExternalLock(false),
+ _require_strictly_increasing_cluster_state_versions(false),
_notifyingListeners(false),
_requested_almost_immediate_node_state_replies(false)
{
@@ -436,21 +439,67 @@ StateManager::mark_controller_as_having_observed_explicit_node_state(const std::
_controllers_observed_explicit_node_state.emplace(controller_index);
}
-void
-StateManager::setClusterStateBundle(const ClusterStateBundle& c)
+std::optional<uint32_t>
+StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index)
{
{
std::lock_guard lock(_stateLock);
- _nextSystemState = std::make_shared<const ClusterStateBundle>(c);
+ uint32_t effective_active_version = (_nextSystemState ? _nextSystemState->getVersion()
+ : _systemState->getVersion());
+ const auto now = _component.getClock().getMonotonicTime();
+ const uint32_t last_ver_from_cc = _last_observed_version_from_cc[origin_controller_index];
+ _last_observed_version_from_cc[origin_controller_index] = c->getVersion();
+
+ if (_require_strictly_increasing_cluster_state_versions && (c->getVersion() < effective_active_version)) {
+ if (c->getVersion() >= last_ver_from_cc) {
+ constexpr auto reject_warn_threshold = 30s;
+ if (now - _last_accepted_cluster_state_time <= reject_warn_threshold) {
+ LOG(debug, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u. Recently accepted another cluster state, "
+ "so assuming transient CC leadership period overlap.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ } else {
+ // Rejections have happened for some time. Make a bit of noise.
+ LOGBP(warning, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ }
+ return {effective_active_version};
+ } else {
+ // SetSystemState RPCs are FIFO-ordered and a particular CC should enforce strictly increasing
+ // cluster state versions through its ZooKeeper quorum (but commands may be resent for a given
+ // version). This means that commands should contain _monotonically increasing_ versions from
+ // a given CC origin index.
+ // If this is _not_ the case, it indicates ZooKeeper state on the CCs has been lost or wiped,
+ // at which point we have no other realistic choice than to accept the version, or the system
+ // will stall until an operator manually intervenes by restarting the content cluster.
+ LOG(error, "Received cluster state version %u from cluster controller %u, which is lower than "
+ "the current state version (%u) and the last received version (%u) from the same controller. "
+ "This indicates loss of cluster controller ZooKeeper state; accepting lower version to "
+ "prevent content cluster operations from stalling for an indeterminate amount of time.",
+ c->getVersion(), origin_controller_index, effective_active_version, last_ver_from_cc);
+ // Fall through to state acceptance.
+ }
+ }
+ _last_accepted_cluster_state_time = now;
+ _nextSystemState = std::move(c);
}
notifyStateListeners();
+ return std::nullopt;
}
bool
StateManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
- setClusterStateBundle(cmd->getClusterStateBundle());
- sendUp(std::make_shared<api::SetSystemStateReply>(*cmd));
+ auto reply = std::make_shared<api::SetSystemStateReply>(*cmd);
+ const auto maybe_rejected_by_ver = try_set_cluster_state_bundle(cmd->cluster_state_bundle_ptr(), cmd->getSourceIndex());
+ if (maybe_rejected_by_ver) {
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
+ fmt("Cluster state version %u rejected; node already has a higher cluster state version (%u)",
+ cmd->getClusterStateBundle().getVersion(), *maybe_rejected_by_ver)));
+ }
+ sendUp(reply);
return true;
}
@@ -520,6 +569,13 @@ StateManager::tick() {
warn_on_missing_health_ping();
}
+void
+StateManager::set_require_strictly_increasing_cluster_state_versions(bool req) noexcept
+{
+ std::lock_guard guard(_stateLock);
+ _require_strictly_increasing_cluster_state_versions = req;
+}
+
bool
StateManager::sendGetNodeStateReplies() {
return sendGetNodeStateReplies(0xffff);
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 72b89dc4d65..d116f968731 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -1,9 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StateManager
- * @ingroup storageserver
- *
- * @brief Keeps and updates node and system states.
+ * Keeps and updates node and system states.
*
* This component implements the NodeStateUpdater interface to handle states
* for all components. See that interface for documentation.
@@ -22,11 +19,13 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/objects/floatingpointtype.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <atomic>
#include <deque>
+#include <list>
#include <map>
+#include <optional>
#include <unordered_set>
-#include <list>
-#include <atomic>
namespace metrics {
class MetricManager;
@@ -69,6 +68,8 @@ class StateManager : public NodeStateUpdater,
std::optional<vespalib::steady_time> _health_ping_time;
vespalib::duration _health_ping_warn_interval;
vespalib::steady_time _health_ping_warn_time;
+ vespalib::steady_time _last_accepted_cluster_state_time;
+ vespalib::hash_map<uint16_t, uint32_t> _last_observed_version_from_cc;
std::unique_ptr<HostInfo> _hostInfo;
std::unique_ptr<framework::Thread> _thread;
// Controllers that have observed a GetNodeState response sent _after_
@@ -76,6 +77,7 @@ class StateManager : public NodeStateUpdater,
std::unordered_set<uint16_t> _controllers_observed_explicit_node_state;
bool _noThreadTestMode;
bool _grabbedExternalLock;
+ bool _require_strictly_increasing_cluster_state_versions;
std::atomic<bool> _notifyingListeners;
std::atomic<bool> _requested_almost_immediate_node_state_replies;
@@ -90,6 +92,9 @@ public:
void tick();
void warn_on_missing_health_ping();
+ // Precondition: internal state mutex must not be held
+ void set_require_strictly_increasing_cluster_state_versions(bool req) noexcept;
+
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
@@ -102,7 +107,11 @@ public:
Lock::SP grabStateChangeLock() override;
void setReportedNodeState(const lib::NodeState& state) override;
- void setClusterStateBundle(const ClusterStateBundle& c);
+ // Iff state was accepted, returns std::nullopt
+ // Otherwise (i.e. state was rejected due to a higher version already having been accepted)
+ // returns an optional containing the current, higher cluster state version.
+ [[nodiscard]] std::optional<uint32_t> try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index);
HostInfo& getHostInfo() { return *_hostInfo; }
void immediately_send_get_node_state_replies() override;
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 35b70dd853c..bbee95dc00b 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -92,6 +92,7 @@ StorageNode::StorageNode(
_statusMetrics(),
_stateReporter(),
_stateManager(),
+ _state_manager_ptr(nullptr),
_chain(),
_configLock(),
_initial_config_mutex(),
@@ -146,6 +147,8 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
std::move(_hostInfo),
nodeStateReporter,
_singleThreadedDebugMode);
+ _stateManager->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
+ _state_manager_ptr = _stateManager.get();
_context.getComponentRegister().setNodeStateUpdater(*_stateManager);
// Create storage root folder, in case it doesn't already exist.
@@ -245,12 +248,21 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
- _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode
+ [[maybe_unused]] bool updated = false; // magically touched by ASSIGN() macro. TODO rewrite this fun stuff.
+ if (DIFFER(requireStrictlyIncreasingClusterStateVersions)) {
+ LOG(info, "Live config update: require strictly increasing cluster state versions: %s -> %s",
+ (oldC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"),
+ (newC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"));
+ ASSIGN(requireStrictlyIncreasingClusterStateVersions);
+ }
+ _server_config.active = std::make_unique<StorServerConfig>(oldC);
_server_config.staging.reset();
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ assert(_state_manager_ptr);
+ _state_manager_ptr->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
}
if (_distribution_config.staging) {
StorDistributionConfigBuilder oldC(*_distribution_config.active);
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index a96f6b52a66..93265bece3c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -135,6 +135,10 @@ private:
// Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
+ // Node subclasses may take ownership of _stateManager in order to infuse it into
+ // their own storage link chain, but they MUST ensure its lifetime is maintained.
+ // We need to remember the original pointer in order to update its config.
+ StateManager* _state_manager_ptr;
// The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h
index 4aa4c8a8f31..afeb5ae9c11 100644
--- a/storage/src/vespa/storageapi/message/state.h
+++ b/storage/src/vespa/storageapi/message/state.h
@@ -9,12 +9,6 @@
namespace storage::api {
-/**
- * @class GetNodeStateCommand
- * @ingroup message
- *
- * @brief Command for setting node state. No payload
- */
class GetNodeStateCommand : public StorageCommand {
lib::NodeState::UP _expectedState;
@@ -27,12 +21,6 @@ public:
DECLARE_STORAGECOMMAND(GetNodeStateCommand, onGetNodeState)
};
-/**
- * @class GetNodeStateReply
- * @ingroup message
- *
- * @brief Reply to GetNodeStateCommand
- */
class GetNodeStateReply : public StorageReply {
lib::NodeState::UP _state;
std::string _nodeInfo;
@@ -53,12 +41,9 @@ public:
};
/**
- * @class SetSystemStateCommand
- * @ingroup message
- *
- * @brief Command for telling a node about the system state - state of each node
- * in the system and state of the system (all ok, no merging, block
- * put/get/remove etx)
+ * Command for telling a node about the cluster state - state of each node
+ * in the cluster and state of the cluster itself (all ok, no merging, block
+ * put/get/remove etx)
*/
class SetSystemStateCommand : public StorageCommand {
std::shared_ptr<const lib::ClusterStateBundle> _state;
@@ -79,12 +64,6 @@ public:
DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState)
};
-/**
- * @class SetSystemStateReply
- * @ingroup message
- *
- * @brief Reply received after a SetSystemStateCommand.
- */
class SetSystemStateReply : public StorageReply {
std::shared_ptr<const lib::ClusterStateBundle> _state;