From 356063939c2a78d32437cbf998db58d996c70ba4 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Mon, 3 Jun 2024 12:36:53 +0000 Subject: Support enforcing strictly increasing state versions across cluster controllers Adds a (live) config that specifies if content nodes and distributors shall reject cluster state versions that are lower than the one that is currently active on the node. This prevents "last write wins" ordering problems when multiple cluster controllers have partially overlapping leadership periods. In the name of pragmatism, we try to auto-detect the case where ZooKeeper state must have been lost on the cluster controller cluster, and accept the state even with lower version number. Otherwise, the content cluster would be effectively stalled until all its processes had been manually restarted. Adds wiring of live config to the `StateManager` component on both content and distributor nodes. --- .../src/tests/storageserver/statemanagertest.cpp | 126 ++++++++++++++++----- 1 file changed, 98 insertions(+), 28 deletions(-) (limited to 'storage/src/tests/storageserver/statemanagertest.cpp') diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index b785bc141b6..79246cb3ce1 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -32,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter { void SetUp() override; void TearDown() override; - void force_current_cluster_state_version(uint32_t version); + static std::shared_ptr make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) { + auto cmd = std::make_shared(lib::ClusterState(state_str)); + cmd->setSourceIndex(cc_index); + return cmd; + } + + void get_single_reply(std::shared_ptr& reply_out); + void get_only_ok_reply(std::shared_ptr& reply_out); + void force_current_cluster_state_version(uint32_t version, uint16_t cc_index); + void force_current_cluster_state_version(uint32_t version) { + force_current_cluster_state_version(version, 0); + } void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -86,11 +97,30 @@ StateManagerTest::TearDown() { } void -StateManagerTest::force_current_cluster_state_version(uint32_t version) +StateManagerTest::get_single_reply(std::shared_ptr& reply_out) +{ + ASSERT_EQ(_upper->getNumReplies(), 1); + ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); + reply_out = std::dynamic_pointer_cast(_upper->getReply(0)); + ASSERT_TRUE(reply_out.get() != nullptr); + _upper->reset(); +} + +void +StateManagerTest::get_only_ok_reply(std::shared_ptr& reply_out) +{ + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out)); + ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK)); +} + +void +StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index) { ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); state.setVersion(version); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle( + std::make_shared(state), cc_index); + ASSERT_EQ(maybe_rejected_by_ver, std::nullopt); } void @@ -118,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version version_out = clusterStateVersionCursor.asLong(); } -#define GET_ONLY_OK_REPLY(varname) \ -{ \ - ASSERT_EQ(size_t(1), _upper->getNumReplies()); \ - ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \ - varname = std::dynamic_pointer_cast( \ - _upper->getReply(0)); \ - ASSERT_TRUE(varname.get() != nullptr); \ - _upper->reset(); \ - ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \ - varname->getResult()); \ -} - TEST_F(StateManagerTest, cluster_state) { std::shared_ptr reply; // Verify initial state on startup auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ("cluster:d", currentState->toString(false)); + EXPECT_EQ(currentState->getVersion(), 0); auto currentNodeState = _manager->getCurrentNodeState(); EXPECT_EQ("s:d", currentNodeState->toString(false)); @@ -142,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) { ClusterState sendState("storage:4 .2.s:m"); auto cmd = std::make_shared(sendState); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); EXPECT_EQ(sendState, *currentState); @@ -151,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) { EXPECT_EQ("s:m", currentNodeState->toString(false)); } +TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(false); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122); +} + +TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0 + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_single_reply(reply)); + api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has " + "a higher cluster state version (123)"); + EXPECT_EQ(reply->getResult(), expected_res); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); +} + +// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper +// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version +// to avoid stalling the entire cluster for an indeterminate amount of time. +TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) { + _manager->set_require_strictly_increasing_cluster_state_versions(true); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123); + + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2 + ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124); + + // CC 1 restarts from scratch with previous ZK state up in smoke. + _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1)); + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3); + + // CC 2 restarts and continues from where CC 1 left off. + _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2)); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4); +} + namespace { struct MyStateListener : public StateListener { const NodeStateUpdater& updater; lib::NodeState current; std::ostringstream ost; - MyStateListener(const NodeStateUpdater& upd); + explicit MyStateListener(const NodeStateUpdater& upd); ~MyStateListener() override; void handleNewState() noexcept override { @@ -194,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) { // And get node state command (no expected state) auto cmd = std::make_shared(lib::NodeState::UP()); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_shared( dynamic_cast(*reply).getNodeState()); @@ -203,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) { cmd = std::make_shared( std::make_unique(NodeType::STORAGE, State::INITIALIZING)); _upper->sendDown(cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique( dynamic_cast(*reply).getNodeState()); @@ -222,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) { _manager->setReportedNodeState(ns); } - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); nodeState = std::make_unique( dynamic_cast(*reply).getNodeState()); @@ -244,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) { } TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) { - force_current_cluster_state_version(123); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 123); @@ -266,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() { ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType()); } @@ -344,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply) } TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) { - force_current_cluster_state_version(12345); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345)); auto cmd = std::make_shared(12340); cmd->setTimeout(10000000ms); @@ -353,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat ASSERT_EQ(1, _upper->getNumReplies()); std::shared_ptr reply; - GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); auto& activate_reply = dynamic_cast(*reply); EXPECT_EQ(12340, activate_reply.activateVersion()); @@ -366,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ cmd->setSourceIndex(0); _upper->sendDown(cmd); std::shared_ptr reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -374,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_ } TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) { - force_current_cluster_state_version(100); + ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100)); lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true); auto state_cmd = std::make_shared(deferred_bundle); @@ -382,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti state_cmd->setSourceIndex(0); _upper->sendDown(state_cmd); std::shared_ptr reply; - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); uint32_t version; ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); @@ -392,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti activation_cmd->setTimeout(1000s); activation_cmd->setSourceIndex(0); _upper->sendDown(activation_cmd); - GET_ONLY_OK_REPLY(reply); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version)); EXPECT_EQ(version, 101); -- cgit v1.2.3