aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp126
-rw-r--r--storage/src/vespa/storage/config/stor-server.def8
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp40
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp68
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h23
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h4
-rw-r--r--storage/src/vespa/storageapi/message/state.h27
9 files changed, 234 insertions, 85 deletions
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index b785bc141b6..79246cb3ce1 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -32,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter {
void SetUp() override;
void TearDown() override;
- void force_current_cluster_state_version(uint32_t version);
+ static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ cmd->setSourceIndex(cc_index);
+ return cmd;
+ }
+
+ void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void force_current_cluster_state_version(uint32_t version, uint16_t cc_index);
+ void force_current_cluster_state_version(uint32_t version) {
+ force_current_cluster_state_version(version, 0);
+ }
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -86,11 +97,30 @@ StateManagerTest::TearDown() {
}
void
-StateManagerTest::force_current_cluster_state_version(uint32_t version)
+StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_EQ(_upper->getNumReplies(), 1);
+ ASSERT_TRUE(_upper->getReply(0)->getType().isReply());
+ reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0));
+ ASSERT_TRUE(reply_out.get() != nullptr);
+ _upper->reset();
+}
+
+void
+StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out));
+ ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK));
+}
+
+void
+StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index)
{
ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
state.setVersion(version);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle(
+ std::make_shared<const lib::ClusterStateBundle>(state), cc_index);
+ ASSERT_EQ(maybe_rejected_by_ver, std::nullopt);
}
void
@@ -118,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version
version_out = clusterStateVersionCursor.asLong();
}
-#define GET_ONLY_OK_REPLY(varname) \
-{ \
- ASSERT_EQ(size_t(1), _upper->getNumReplies()); \
- ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \
- varname = std::dynamic_pointer_cast<api::StorageReply>( \
- _upper->getReply(0)); \
- ASSERT_TRUE(varname.get() != nullptr); \
- _upper->reset(); \
- ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \
- varname->getResult()); \
-}
-
TEST_F(StateManagerTest, cluster_state) {
std::shared_ptr<api::StorageReply> reply;
// Verify initial state on startup
auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ("cluster:d", currentState->toString(false));
+ EXPECT_EQ(currentState->getVersion(), 0);
auto currentNodeState = _manager->getCurrentNodeState();
EXPECT_EQ("s:d", currentNodeState->toString(false));
@@ -142,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) {
ClusterState sendState("storage:4 .2.s:m");
auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState);
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ(sendState, *currentState);
@@ -151,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) {
EXPECT_EQ("s:m", currentNodeState->toString(false));
}
+TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(false);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122);
+}
+
+TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply));
+ api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has "
+ "a higher cluster state version (123)");
+ EXPECT_EQ(reply->getResult(), expected_res);
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+}
+
+// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper
+// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version
+// to avoid stalling the entire cluster for an indeterminate amount of time.
+TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124);
+
+ // CC 1 restarts from scratch with previous ZK state up in smoke.
+ _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1));
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3);
+
+ // CC 2 restarts and continues from where CC 1 left off.
+ _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2));
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4);
+}
+
namespace {
struct MyStateListener : public StateListener {
const NodeStateUpdater& updater;
lib::NodeState current;
std::ostringstream ost;
- MyStateListener(const NodeStateUpdater& upd);
+ explicit MyStateListener(const NodeStateUpdater& upd);
~MyStateListener() override;
void handleNewState() noexcept override {
@@ -194,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) {
// And get node state command (no expected state)
auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP());
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_shared<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -203,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) {
cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING));
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -222,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) {
_manager->setReportedNodeState(ns);
}
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -244,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) {
}
TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) {
- force_current_cluster_state_version(123);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 123);
@@ -266,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde
void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() {
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
}
@@ -344,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply)
}
TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) {
- force_current_cluster_state_version(12345);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345));
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
cmd->setTimeout(10000000ms);
@@ -353,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
EXPECT_EQ(12340, activate_reply.activateVersion());
@@ -366,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -374,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
}
TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) {
- force_current_cluster_state_version(100);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100));
lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true);
auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle);
@@ -382,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
state_cmd->setSourceIndex(0);
_upper->sendDown(state_cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -392,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
activation_cmd->setTimeout(1000s);
activation_cmd->setSourceIndex(0);
_upper->sendDown(activation_cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 101);
diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def
index 44e4b14eafc..49ce0b678d0 100644
--- a/storage/src/vespa/storage/config/stor-server.def
+++ b/storage/src/vespa/storage/config/stor-server.def
@@ -118,3 +118,11 @@ content_node_bucket_db_stripe_bits int default=4 restart
## Iff set, a special `pidfile` file is written under the node's root directory upon
## startup containing the PID of the running process.
write_pid_file_on_startup bool default=true
+
+## Iff true, received cluster state versions that are lower than the current active
+## (or pending to be active) version on the node will be explicitly rejected. This
+## prevents race conditions caused by multiple cluster controllers believing they
+## are the leader during overlapping time intervals, as only the most recent leader
+## is able to increment the current state version in ZooKeeper, but the old controller
+## may still attempt to publish its old state.
+require_strictly_increasing_cluster_state_versions bool default=false
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index bbd4e87cb40..1b119a0e631 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -622,7 +622,14 @@ CommunicationManager::sendDirectRPCReply(RPCRequestWrapper& request,
request.addReturnString(ns.str().c_str());
LOGBP(debug, "Sending getnodestate2 reply with no host info.");
} else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") {
- // No data to return
+ // No data to return, but the request must be failed iff we rejected the state version
+ // due to a higher version having been previously received.
+ auto& state_reply = dynamic_cast<api::SetSystemStateReply&>(*reply);
+ if (state_reply.getResult().getResult() == api::ReturnCode::REJECTED) {
+ vespalib::string err_msg = state_reply.getResult().getMessage(); // ReturnCode message is stringref
+ request.returnError(FRTE_RPC_METHOD_FAILED, err_msg.c_str());
+ return;
+ }
} else if (requestName == "activate_cluster_state_version") {
auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply));
request.addReturnInt(activate_reply.actualVersion());
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index d18935afe24..98796ee6440 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -125,24 +125,28 @@ ServiceLayerNode::initializeNodeSpecific()
void
ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
{
- if (_server_config.staging) {
- bool updated = false;
- vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
- StorServerConfig& newC(*_server_config.staging);
- {
- updated = false;
- NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
- lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
- if (DIFFER(nodeCapacity)) {
- LOG(info, "Live config update: Updating node capacity from %f to %f.",
- oldC.nodeCapacity, newC.nodeCapacity);
- ASSIGN(nodeCapacity);
- ns.setCapacity(newC.nodeCapacity);
- }
- if (updated) {
- // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional?
- _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
- _component->getStateUpdater().setReportedNodeState(ns);
+ {
+ std::lock_guard config_lock(_configLock);
+ // Live server config patching happens both here and in StorageNode::handleLiveConfigUpdate,
+ // which we have to delegate to afterward (_without_ holding _configLock at the time).
+ if (_server_config.staging) {
+ bool updated = false;
+ vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active);
+ StorServerConfig& newC(*_server_config.staging);
+ {
+ NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock());
+ lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState());
+ if (DIFFER(nodeCapacity)) {
+ LOG(info, "Live config update: Updating node capacity from %f to %f.",
+ oldC.nodeCapacity, newC.nodeCapacity);
+ ASSIGN(nodeCapacity);
+ ns.setCapacity(newC.nodeCapacity);
+ }
+ if (updated) {
+ // FIXME the patching of old config vs new config is confusing and error-prone. Redesign!
+ _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC);
+ _component->getStateUpdater().setReportedNodeState(ns);
+ }
}
}
}
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index adebaa51c08..a2106dce8d2 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -21,7 +21,7 @@
#include <fstream>
#include <ranges>
-#include <vespa/log/log.h>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".state.manager");
using vespalib::make_string_short::fmt;
@@ -74,10 +74,13 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_health_ping_time(),
_health_ping_warn_interval(5min),
_health_ping_warn_time(_start_time + _health_ping_warn_interval),
+ _last_accepted_cluster_state_time(),
+ _last_observed_version_from_cc(),
_hostInfo(std::move(hostInfo)),
_controllers_observed_explicit_node_state(),
_noThreadTestMode(testMode),
_grabbedExternalLock(false),
+ _require_strictly_increasing_cluster_state_versions(false),
_notifyingListeners(false),
_requested_almost_immediate_node_state_replies(false)
{
@@ -436,21 +439,67 @@ StateManager::mark_controller_as_having_observed_explicit_node_state(const std::
_controllers_observed_explicit_node_state.emplace(controller_index);
}
-void
-StateManager::setClusterStateBundle(const ClusterStateBundle& c)
+std::optional<uint32_t>
+StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index)
{
{
std::lock_guard lock(_stateLock);
- _nextSystemState = std::make_shared<const ClusterStateBundle>(c);
+ uint32_t effective_active_version = (_nextSystemState ? _nextSystemState->getVersion()
+ : _systemState->getVersion());
+ const auto now = _component.getClock().getMonotonicTime();
+ const uint32_t last_ver_from_cc = _last_observed_version_from_cc[origin_controller_index];
+ _last_observed_version_from_cc[origin_controller_index] = c->getVersion();
+
+ if (_require_strictly_increasing_cluster_state_versions && (c->getVersion() < effective_active_version)) {
+ if (c->getVersion() >= last_ver_from_cc) {
+ constexpr auto reject_warn_threshold = 30s;
+ if (now - _last_accepted_cluster_state_time <= reject_warn_threshold) {
+ LOG(debug, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u. Recently accepted another cluster state, "
+ "so assuming transient CC leadership period overlap.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ } else {
+ // Rejections have happened for some time. Make a bit of noise.
+ LOGBP(warning, "Rejecting cluster state with version %u from cluster controller %u, as "
+ "we've already accepted version %u.",
+ c->getVersion(), origin_controller_index, effective_active_version);
+ }
+ return {effective_active_version};
+ } else {
+ // SetSystemState RPCs are FIFO-ordered and a particular CC should enforce strictly increasing
+ // cluster state versions through its ZooKeeper quorum (but commands may be resent for a given
+ // version). This means that commands should contain _monotonically increasing_ versions from
+ // a given CC origin index.
+ // If this is _not_ the case, it indicates ZooKeeper state on the CCs has been lost or wiped,
+ // at which point we have no other realistic choice than to accept the version, or the system
+ // will stall until an operator manually intervenes by restarting the content cluster.
+ LOG(error, "Received cluster state version %u from cluster controller %u, which is lower than "
+ "the current state version (%u) and the last received version (%u) from the same controller. "
+ "This indicates loss of cluster controller ZooKeeper state; accepting lower version to "
+ "prevent content cluster operations from stalling for an indeterminate amount of time.",
+ c->getVersion(), origin_controller_index, effective_active_version, last_ver_from_cc);
+ // Fall through to state acceptance.
+ }
+ }
+ _last_accepted_cluster_state_time = now;
+ _nextSystemState = std::move(c);
}
notifyStateListeners();
+ return std::nullopt;
}
bool
StateManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd)
{
- setClusterStateBundle(cmd->getClusterStateBundle());
- sendUp(std::make_shared<api::SetSystemStateReply>(*cmd));
+ auto reply = std::make_shared<api::SetSystemStateReply>(*cmd);
+ const auto maybe_rejected_by_ver = try_set_cluster_state_bundle(cmd->cluster_state_bundle_ptr(), cmd->getSourceIndex());
+ if (maybe_rejected_by_ver) {
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
+ fmt("Cluster state version %u rejected; node already has a higher cluster state version (%u)",
+ cmd->getClusterStateBundle().getVersion(), *maybe_rejected_by_ver)));
+ }
+ sendUp(reply);
return true;
}
@@ -520,6 +569,13 @@ StateManager::tick() {
warn_on_missing_health_ping();
}
+void
+StateManager::set_require_strictly_increasing_cluster_state_versions(bool req) noexcept
+{
+ std::lock_guard guard(_stateLock);
+ _require_strictly_increasing_cluster_state_versions = req;
+}
+
bool
StateManager::sendGetNodeStateReplies() {
return sendGetNodeStateReplies(0xffff);
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index 72b89dc4d65..d116f968731 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -1,9 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @class storage::StateManager
- * @ingroup storageserver
- *
- * @brief Keeps and updates node and system states.
+ * Keeps and updates node and system states.
*
* This component implements the NodeStateUpdater interface to handle states
* for all components. See that interface for documentation.
@@ -22,11 +19,13 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageapi/messageapi/storagemessage.h>
#include <vespa/vespalib/objects/floatingpointtype.h>
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <atomic>
#include <deque>
+#include <list>
#include <map>
+#include <optional>
#include <unordered_set>
-#include <list>
-#include <atomic>
namespace metrics {
class MetricManager;
@@ -69,6 +68,8 @@ class StateManager : public NodeStateUpdater,
std::optional<vespalib::steady_time> _health_ping_time;
vespalib::duration _health_ping_warn_interval;
vespalib::steady_time _health_ping_warn_time;
+ vespalib::steady_time _last_accepted_cluster_state_time;
+ vespalib::hash_map<uint16_t, uint32_t> _last_observed_version_from_cc;
std::unique_ptr<HostInfo> _hostInfo;
std::unique_ptr<framework::Thread> _thread;
// Controllers that have observed a GetNodeState response sent _after_
@@ -76,6 +77,7 @@ class StateManager : public NodeStateUpdater,
std::unordered_set<uint16_t> _controllers_observed_explicit_node_state;
bool _noThreadTestMode;
bool _grabbedExternalLock;
+ bool _require_strictly_increasing_cluster_state_versions;
std::atomic<bool> _notifyingListeners;
std::atomic<bool> _requested_almost_immediate_node_state_replies;
@@ -90,6 +92,9 @@ public:
void tick();
void warn_on_missing_health_ping();
+ // Precondition: internal state mutex must not be held
+ void set_require_strictly_increasing_cluster_state_versions(bool req) noexcept;
+
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override;
@@ -102,7 +107,11 @@ public:
Lock::SP grabStateChangeLock() override;
void setReportedNodeState(const lib::NodeState& state) override;
- void setClusterStateBundle(const ClusterStateBundle& c);
+ // Iff state was accepted, returns std::nullopt
+ // Otherwise (i.e. state was rejected due to a higher version already having been accepted)
+ // returns an optional containing the current, higher cluster state version.
+ [[nodiscard]] std::optional<uint32_t> try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c,
+ uint16_t origin_controller_index);
HostInfo& getHostInfo() { return *_hostInfo; }
void immediately_send_get_node_state_replies() override;
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index 35b70dd853c..bbee95dc00b 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -92,6 +92,7 @@ StorageNode::StorageNode(
_statusMetrics(),
_stateReporter(),
_stateManager(),
+ _state_manager_ptr(nullptr),
_chain(),
_configLock(),
_initial_config_mutex(),
@@ -146,6 +147,8 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter)
std::move(_hostInfo),
nodeStateReporter,
_singleThreadedDebugMode);
+ _stateManager->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
+ _state_manager_ptr = _stateManager.get();
_context.getComponentRegister().setNodeStateUpdater(*_stateManager);
// Create storage root folder, in case it doesn't already exist.
@@ -245,12 +248,21 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard)
DIFFERWARN(clusterName, "Cannot alter cluster name of node live");
DIFFERWARN(nodeIndex, "Cannot alter node index of node live");
DIFFERWARN(isDistributor, "Cannot alter role of node live");
- _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode
+ [[maybe_unused]] bool updated = false; // magically touched by ASSIGN() macro. TODO rewrite this fun stuff.
+ if (DIFFER(requireStrictlyIncreasingClusterStateVersions)) {
+ LOG(info, "Live config update: require strictly increasing cluster state versions: %s -> %s",
+ (oldC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"),
+ (newC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"));
+ ASSIGN(requireStrictlyIncreasingClusterStateVersions);
+ }
+ _server_config.active = std::make_unique<StorServerConfig>(oldC);
_server_config.staging.reset();
_deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings);
_deadLockDetector->enableShutdown(server_config().enableDeadLockDetector);
_deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
_deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack));
+ assert(_state_manager_ptr);
+ _state_manager_ptr->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions);
}
if (_distribution_config.staging) {
StorDistributionConfigBuilder oldC(*_distribution_config.active);
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index a96f6b52a66..93265bece3c 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -135,6 +135,10 @@ private:
// Depends on metric manager
std::unique_ptr<StateReporter> _stateReporter;
std::unique_ptr<StateManager> _stateManager;
+ // Node subclasses may take ownership of _stateManager in order to infuse it into
+ // their own storage link chain, but they MUST ensure its lifetime is maintained.
+ // We need to remember the original pointer in order to update its config.
+ StateManager* _state_manager_ptr;
// The storage chain can depend on anything.
std::unique_ptr<StorageLink> _chain;
diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h
index 4aa4c8a8f31..afeb5ae9c11 100644
--- a/storage/src/vespa/storageapi/message/state.h
+++ b/storage/src/vespa/storageapi/message/state.h
@@ -9,12 +9,6 @@
namespace storage::api {
-/**
- * @class GetNodeStateCommand
- * @ingroup message
- *
- * @brief Command for setting node state. No payload
- */
class GetNodeStateCommand : public StorageCommand {
lib::NodeState::UP _expectedState;
@@ -27,12 +21,6 @@ public:
DECLARE_STORAGECOMMAND(GetNodeStateCommand, onGetNodeState)
};
-/**
- * @class GetNodeStateReply
- * @ingroup message
- *
- * @brief Reply to GetNodeStateCommand
- */
class GetNodeStateReply : public StorageReply {
lib::NodeState::UP _state;
std::string _nodeInfo;
@@ -53,12 +41,9 @@ public:
};
/**
- * @class SetSystemStateCommand
- * @ingroup message
- *
- * @brief Command for telling a node about the system state - state of each node
- * in the system and state of the system (all ok, no merging, block
- * put/get/remove etx)
+ * Command for telling a node about the cluster state - state of each node
+ * in the cluster and state of the cluster itself (all ok, no merging, block
+ * put/get/remove etx)
*/
class SetSystemStateCommand : public StorageCommand {
std::shared_ptr<const lib::ClusterStateBundle> _state;
@@ -79,12 +64,6 @@ public:
DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState)
};
-/**
- * @class SetSystemStateReply
- * @ingroup message
- *
- * @brief Reply received after a SetSystemStateCommand.
- */
class SetSystemStateReply : public StorageReply {
std::shared_ptr<const lib::ClusterStateBundle> _state;