diff options
9 files changed, 234 insertions, 85 deletions
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index b785bc141b6..79246cb3ce1 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -32,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter { void SetUp() override; void TearDown() override; - void force_current_cluster_state_version(uint32_t version); + static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str)); + cmd->setSourceIndex(cc_index); + return cmd; + } + + void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out); + void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out); + void force_current_cluster_state_version(uint32_t version, uint16_t cc_index); + void force_current_cluster_state_version(uint32_t version) { + force_current_cluster_state_version(version, 0); + } void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -86,11 +97,30 @@ StateManagerTest::TearDown() { } void -StateManagerTest::force_current_cluster_state_version(uint32_t version) +StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out) +{ + ASSERT_EQ(_upper->getNumReplies(), 1); + ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); + reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0)); + ASSERT_TRUE(reply_out.get() != nullptr); + _upper->reset(); +} + +void +StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out) +{ + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out)); + ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK)); +} + +void +StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index) { ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); state.setVersion(version); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle( + std::make_shared<const lib::ClusterStateBundle>(state), cc_index); + ASSERT_EQ(maybe_rejected_by_ver, std::nullopt); } void @@ -118,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version version_out = clusterStateVersionCursor.asLong(); } -#define GET_ONLY_OK_REPLY(varname) \ -{ \ - ASSERT_EQ(size_t(1), _upper->getNumReplies()); \ - ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \ - varname = std::dynamic_pointer_cast<api::StorageReply>( \ - _upper->getReply(0)); \ - ASSERT_TRUE(varname.get() != nullptr); \ - _upper->reset(); \ - ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \ - varname->getResult()); \ -} - TEST_F(StateManagerTest, cluster_state) { std::shared_ptr<api::StorageReply> reply; // Verify initial state on startup auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ("cluster:d", currentState->toString(false)); + EXPECT_EQ(currentState->getVersion(), 0); auto currentNodeState = _manager->getCurrentNodeState(); EXPECT_EQ("s:d", currentNodeState->toString(false)); @@ -142,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) { ClusterState sendState("storage:4 .2.s:m"); auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ(sendState, *currentState); @@ -151,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) { EXPECT_EQ("s:m", currentNodeState->toString(false)); } +TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(false); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122); +} + +TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply)); + api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has " + "a higher cluster state version (123)"); + EXPECT_EQ(reply->getResult(), expected_res); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); +} + +// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper +// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version +// to avoid stalling the entire cluster for an indeterminate amount of time. +TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124); + + // CC 1 restarts from scratch with previous ZK state up in smoke. + _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1)); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3); + + // CC 2 restarts and continues from where CC 1 left off. + _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2)); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4); +} + namespace { struct MyStateListener : public StateListener { const NodeStateUpdater& updater; lib::NodeState current; std::ostringstream ost; - MyStateListener(const NodeStateUpdater& upd); + explicit MyStateListener(const NodeStateUpdater& upd); ~MyStateListener() override; void handleNewState() noexcept override { @@ -194,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) { // And get node state command (no expected state) auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP()); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_shared<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -203,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) { cmd = std::make_shared<api::GetNodeStateCommand>( std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING)); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -222,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) { _manager->setReportedNodeState(ns); } - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique<NodeState>( dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState()); @@ -244,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) { } TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) { - force_current_cluster_state_version(123); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 123); @@ -266,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() { ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); } @@ -344,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply) } TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) { - force_current_cluster_state_version(12345); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345)); auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); cmd->setTimeout(10000000ms); @@ -353,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply); EXPECT_EQ(12340, activate_reply.activateVersion()); @@ -366,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ cmd->setSourceIndex(0); _upper->sendDown(cmd); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -374,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ } TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) { - force_current_cluster_state_version(100); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100)); lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true); auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle); @@ -382,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti state_cmd->setSourceIndex(0); _upper->sendDown(state_cmd); std::shared_ptr<api::StorageReply> reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -392,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti activation_cmd->setTimeout(1000s); activation_cmd->setSourceIndex(0); _upper->sendDown(activation_cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 101); diff --git a/storage/src/vespa/storage/config/stor-server.def b/storage/src/vespa/storage/config/stor-server.def index 44e4b14eafc..49ce0b678d0 100644 --- a/storage/src/vespa/storage/config/stor-server.def +++ b/storage/src/vespa/storage/config/stor-server.def @@ -118,3 +118,11 @@ content_node_bucket_db_stripe_bits int default=4 restart ## Iff set, a special `pidfile` file is written under the node's root directory upon ## startup containing the PID of the running process. write_pid_file_on_startup bool default=true + +## Iff true, received cluster state versions that are lower than the current active +## (or pending to be active) version on the node will be explicitly rejected. This +## prevents race conditions caused by multiple cluster controllers believing they +## are the leader during overlapping time intervals, as only the most recent leader +## is able to increment the current state version in ZooKeeper, but the old controller +## may still attempt to publish its old state. +require_strictly_increasing_cluster_state_versions bool default=false diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index bbd4e87cb40..1b119a0e631 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -622,7 +622,14 @@ CommunicationManager::sendDirectRPCReply(RPCRequestWrapper& request, request.addReturnString(ns.str().c_str()); LOGBP(debug, "Sending getnodestate2 reply with no host info."); } else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") { - // No data to return + // No data to return, but the request must be failed iff we rejected the state version + // due to a higher version having been previously received. + auto& state_reply = dynamic_cast<api::SetSystemStateReply&>(*reply); + if (state_reply.getResult().getResult() == api::ReturnCode::REJECTED) { + vespalib::string err_msg = state_reply.getResult().getMessage(); // ReturnCode message is stringref + request.returnError(FRTE_RPC_METHOD_FAILED, err_msg.c_str()); + return; + } } else if (requestName == "activate_cluster_state_version") { auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply)); request.addReturnInt(activate_reply.actualVersion()); diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index d18935afe24..98796ee6440 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -125,24 +125,28 @@ ServiceLayerNode::initializeNodeSpecific() void ServiceLayerNode::handleLiveConfigUpdate(const InitialGuard & initGuard) { - if (_server_config.staging) { - bool updated = false; - vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active); - StorServerConfig& newC(*_server_config.staging); - { - updated = false; - NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); - lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); - if (DIFFER(nodeCapacity)) { - LOG(info, "Live config update: Updating node capacity from %f to %f.", - oldC.nodeCapacity, newC.nodeCapacity); - ASSIGN(nodeCapacity); - ns.setCapacity(newC.nodeCapacity); - } - if (updated) { - // FIXME this always gets overwritten by StorageNode::handleLiveConfigUpdate...! Intentional? - _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC); - _component->getStateUpdater().setReportedNodeState(ns); + { + std::lock_guard config_lock(_configLock); + // Live server config patching happens both here and in StorageNode::handleLiveConfigUpdate, + // which we have to delegate to afterward (_without_ holding _configLock at the time). + if (_server_config.staging) { + bool updated = false; + vespa::config::content::core::StorServerConfigBuilder oldC(*_server_config.active); + StorServerConfig& newC(*_server_config.staging); + { + NodeStateUpdater::Lock::SP lock(_component->getStateUpdater().grabStateChangeLock()); + lib::NodeState ns(*_component->getStateUpdater().getReportedNodeState()); + if (DIFFER(nodeCapacity)) { + LOG(info, "Live config update: Updating node capacity from %f to %f.", + oldC.nodeCapacity, newC.nodeCapacity); + ASSIGN(nodeCapacity); + ns.setCapacity(newC.nodeCapacity); + } + if (updated) { + // FIXME the patching of old config vs new config is confusing and error-prone. Redesign! + _server_config.active = std::make_unique<vespa::config::content::core::StorServerConfig>(oldC); + _component->getStateUpdater().setReportedNodeState(ns); + } } } } diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index adebaa51c08..a2106dce8d2 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -21,7 +21,7 @@ #include <fstream> #include <ranges> -#include <vespa/log/log.h> +#include <vespa/log/bufferedlogger.h> LOG_SETUP(".state.manager"); using vespalib::make_string_short::fmt; @@ -74,10 +74,13 @@ StateManager::StateManager(StorageComponentRegister& compReg, _health_ping_time(), _health_ping_warn_interval(5min), _health_ping_warn_time(_start_time + _health_ping_warn_interval), + _last_accepted_cluster_state_time(), + _last_observed_version_from_cc(), _hostInfo(std::move(hostInfo)), _controllers_observed_explicit_node_state(), _noThreadTestMode(testMode), _grabbedExternalLock(false), + _require_strictly_increasing_cluster_state_versions(false), _notifyingListeners(false), _requested_almost_immediate_node_state_replies(false) { @@ -436,21 +439,67 @@ StateManager::mark_controller_as_having_observed_explicit_node_state(const std:: _controllers_observed_explicit_node_state.emplace(controller_index); } -void -StateManager::setClusterStateBundle(const ClusterStateBundle& c) +std::optional<uint32_t> +StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c, + uint16_t origin_controller_index) { { std::lock_guard lock(_stateLock); - _nextSystemState = std::make_shared<const ClusterStateBundle>(c); + uint32_t effective_active_version = (_nextSystemState ? _nextSystemState->getVersion() + : _systemState->getVersion()); + const auto now = _component.getClock().getMonotonicTime(); + const uint32_t last_ver_from_cc = _last_observed_version_from_cc[origin_controller_index]; + _last_observed_version_from_cc[origin_controller_index] = c->getVersion(); + + if (_require_strictly_increasing_cluster_state_versions && (c->getVersion() < effective_active_version)) { + if (c->getVersion() >= last_ver_from_cc) { + constexpr auto reject_warn_threshold = 30s; + if (now - _last_accepted_cluster_state_time <= reject_warn_threshold) { + LOG(debug, "Rejecting cluster state with version %u from cluster controller %u, as " + "we've already accepted version %u. Recently accepted another cluster state, " + "so assuming transient CC leadership period overlap.", + c->getVersion(), origin_controller_index, effective_active_version); + } else { + // Rejections have happened for some time. Make a bit of noise. + LOGBP(warning, "Rejecting cluster state with version %u from cluster controller %u, as " + "we've already accepted version %u.", + c->getVersion(), origin_controller_index, effective_active_version); + } + return {effective_active_version}; + } else { + // SetSystemState RPCs are FIFO-ordered and a particular CC should enforce strictly increasing + // cluster state versions through its ZooKeeper quorum (but commands may be resent for a given + // version). This means that commands should contain _monotonically increasing_ versions from + // a given CC origin index. + // If this is _not_ the case, it indicates ZooKeeper state on the CCs has been lost or wiped, + // at which point we have no other realistic choice than to accept the version, or the system + // will stall until an operator manually intervenes by restarting the content cluster. + LOG(error, "Received cluster state version %u from cluster controller %u, which is lower than " + "the current state version (%u) and the last received version (%u) from the same controller. " + "This indicates loss of cluster controller ZooKeeper state; accepting lower version to " + "prevent content cluster operations from stalling for an indeterminate amount of time.", + c->getVersion(), origin_controller_index, effective_active_version, last_ver_from_cc); + // Fall through to state acceptance. + } + } + _last_accepted_cluster_state_time = now; + _nextSystemState = std::move(c); } notifyStateListeners(); + return std::nullopt; } bool StateManager::onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) { - setClusterStateBundle(cmd->getClusterStateBundle()); - sendUp(std::make_shared<api::SetSystemStateReply>(*cmd)); + auto reply = std::make_shared<api::SetSystemStateReply>(*cmd); + const auto maybe_rejected_by_ver = try_set_cluster_state_bundle(cmd->cluster_state_bundle_ptr(), cmd->getSourceIndex()); + if (maybe_rejected_by_ver) { + reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, + fmt("Cluster state version %u rejected; node already has a higher cluster state version (%u)", + cmd->getClusterStateBundle().getVersion(), *maybe_rejected_by_ver))); + } + sendUp(reply); return true; } @@ -520,6 +569,13 @@ StateManager::tick() { warn_on_missing_health_ping(); } +void +StateManager::set_require_strictly_increasing_cluster_state_versions(bool req) noexcept +{ + std::lock_guard guard(_stateLock); + _require_strictly_increasing_cluster_state_versions = req; +} + bool StateManager::sendGetNodeStateReplies() { return sendGetNodeStateReplies(0xffff); diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 72b89dc4d65..d116f968731 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -1,9 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @class storage::StateManager - * @ingroup storageserver - * - * @brief Keeps and updates node and system states. + * Keeps and updates node and system states. * * This component implements the NodeStateUpdater interface to handle states * for all components. See that interface for documentation. @@ -22,11 +19,13 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/vespalib/objects/floatingpointtype.h> +#include <vespa/vespalib/stllike/hash_map.h> +#include <atomic> #include <deque> +#include <list> #include <map> +#include <optional> #include <unordered_set> -#include <list> -#include <atomic> namespace metrics { class MetricManager; @@ -69,6 +68,8 @@ class StateManager : public NodeStateUpdater, std::optional<vespalib::steady_time> _health_ping_time; vespalib::duration _health_ping_warn_interval; vespalib::steady_time _health_ping_warn_time; + vespalib::steady_time _last_accepted_cluster_state_time; + vespalib::hash_map<uint16_t, uint32_t> _last_observed_version_from_cc; std::unique_ptr<HostInfo> _hostInfo; std::unique_ptr<framework::Thread> _thread; // Controllers that have observed a GetNodeState response sent _after_ @@ -76,6 +77,7 @@ class StateManager : public NodeStateUpdater, std::unordered_set<uint16_t> _controllers_observed_explicit_node_state; bool _noThreadTestMode; bool _grabbedExternalLock; + bool _require_strictly_increasing_cluster_state_versions; std::atomic<bool> _notifyingListeners; std::atomic<bool> _requested_almost_immediate_node_state_replies; @@ -90,6 +92,9 @@ public: void tick(); void warn_on_missing_health_ping(); + // Precondition: internal state mutex must not be held + void set_require_strictly_increasing_cluster_state_versions(bool req) noexcept; + void print(std::ostream& out, bool verbose, const std::string& indent) const override; void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; @@ -102,7 +107,11 @@ public: Lock::SP grabStateChangeLock() override; void setReportedNodeState(const lib::NodeState& state) override; - void setClusterStateBundle(const ClusterStateBundle& c); + // Iff state was accepted, returns std::nullopt + // Otherwise (i.e. state was rejected due to a higher version already having been accepted) + // returns an optional containing the current, higher cluster state version. + [[nodiscard]] std::optional<uint32_t> try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBundle> c, + uint16_t origin_controller_index); HostInfo& getHostInfo() { return *_hostInfo; } void immediately_send_get_node_state_replies() override; diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index 35b70dd853c..bbee95dc00b 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -92,6 +92,7 @@ StorageNode::StorageNode( _statusMetrics(), _stateReporter(), _stateManager(), + _state_manager_ptr(nullptr), _chain(), _configLock(), _initial_config_mutex(), @@ -146,6 +147,8 @@ StorageNode::initialize(const NodeStateReporter & nodeStateReporter) std::move(_hostInfo), nodeStateReporter, _singleThreadedDebugMode); + _stateManager->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions); + _state_manager_ptr = _stateManager.get(); _context.getComponentRegister().setNodeStateUpdater(*_stateManager); // Create storage root folder, in case it doesn't already exist. @@ -245,12 +248,21 @@ StorageNode::handleLiveConfigUpdate(const InitialGuard & initGuard) DIFFERWARN(clusterName, "Cannot alter cluster name of node live"); DIFFERWARN(nodeIndex, "Cannot alter node index of node live"); DIFFERWARN(isDistributor, "Cannot alter role of node live"); - _server_config.active = std::make_unique<StorServerConfig>(oldC); // TODO this overwrites from ServiceLayerNode + [[maybe_unused]] bool updated = false; // magically touched by ASSIGN() macro. TODO rewrite this fun stuff. + if (DIFFER(requireStrictlyIncreasingClusterStateVersions)) { + LOG(info, "Live config update: require strictly increasing cluster state versions: %s -> %s", + (oldC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false"), + (newC.requireStrictlyIncreasingClusterStateVersions ? "true" : "false")); + ASSIGN(requireStrictlyIncreasingClusterStateVersions); + } + _server_config.active = std::make_unique<StorServerConfig>(oldC); _server_config.staging.reset(); _deadLockDetector->enableWarning(server_config().enableDeadLockDetectorWarnings); _deadLockDetector->enableShutdown(server_config().enableDeadLockDetector); _deadLockDetector->setProcessSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); _deadLockDetector->setWaitSlack(vespalib::from_s(server_config().deadLockDetectorTimeoutSlack)); + assert(_state_manager_ptr); + _state_manager_ptr->set_require_strictly_increasing_cluster_state_versions(server_config().requireStrictlyIncreasingClusterStateVersions); } if (_distribution_config.staging) { StorDistributionConfigBuilder oldC(*_distribution_config.active); diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index a96f6b52a66..93265bece3c 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -135,6 +135,10 @@ private: // Depends on metric manager std::unique_ptr<StateReporter> _stateReporter; std::unique_ptr<StateManager> _stateManager; + // Node subclasses may take ownership of _stateManager in order to infuse it into + // their own storage link chain, but they MUST ensure its lifetime is maintained. + // We need to remember the original pointer in order to update its config. + StateManager* _state_manager_ptr; // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; diff --git a/storage/src/vespa/storageapi/message/state.h b/storage/src/vespa/storageapi/message/state.h index 4aa4c8a8f31..afeb5ae9c11 100644 --- a/storage/src/vespa/storageapi/message/state.h +++ b/storage/src/vespa/storageapi/message/state.h @@ -9,12 +9,6 @@ namespace storage::api { -/** - * @class GetNodeStateCommand - * @ingroup message - * - * @brief Command for setting node state. No payload - */ class GetNodeStateCommand : public StorageCommand { lib::NodeState::UP _expectedState; @@ -27,12 +21,6 @@ public: DECLARE_STORAGECOMMAND(GetNodeStateCommand, onGetNodeState) }; -/** - * @class GetNodeStateReply - * @ingroup message - * - * @brief Reply to GetNodeStateCommand - */ class GetNodeStateReply : public StorageReply { lib::NodeState::UP _state; std::string _nodeInfo; @@ -53,12 +41,9 @@ public: }; /** - * @class SetSystemStateCommand - * @ingroup message - * - * @brief Command for telling a node about the system state - state of each node - * in the system and state of the system (all ok, no merging, block - * put/get/remove etx) + * Command for telling a node about the cluster state - state of each node + * in the cluster and state of the cluster itself (all ok, no merging, block + * put/get/remove etx) */ class SetSystemStateCommand : public StorageCommand { std::shared_ptr<const lib::ClusterStateBundle> _state; @@ -79,12 +64,6 @@ public: DECLARE_STORAGECOMMAND(SetSystemStateCommand, onSetSystemState) }; -/** - * @class SetSystemStateReply - * @ingroup message - * - * @brief Reply received after a SetSystemStateCommand. - */ class SetSystemStateReply : public StorageReply { std::shared_ptr<const lib::ClusterStateBundle> _state; |