aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/storageserver/statemanagertest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-06-03 12:36:53 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-06-03 12:58:34 +0000
commit356063939c2a78d32437cbf998db58d996c70ba4 (patch)
tree2cac0ae179ba02fd86850720b34165b57cc7076c /storage/src/tests/storageserver/statemanagertest.cpp
parentd2ba6cf3ba59d0f25c4111f2cc670eb5de7d126f (diff)
Support enforcing strictly increasing state versions across cluster controllers
Adds a (live) config that specifies if content nodes and distributors shall reject cluster state versions that are lower than the one that is currently active on the node. This prevents "last write wins" ordering problems when multiple cluster controllers have partially overlapping leadership periods. In the name of pragmatism, we try to auto-detect the case where ZooKeeper state must have been lost on the cluster controller cluster, and accept the state even with lower version number. Otherwise, the content cluster would be effectively stalled until all its processes had been manually restarted. Adds wiring of live config to the `StateManager` component on both content and distributor nodes.
Diffstat (limited to 'storage/src/tests/storageserver/statemanagertest.cpp')
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp126
1 files changed, 98 insertions, 28 deletions
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index b785bc141b6..79246cb3ce1 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -32,7 +32,18 @@ struct StateManagerTest : Test, NodeStateReporter {
void SetUp() override;
void TearDown() override;
- void force_current_cluster_state_version(uint32_t version);
+ static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd(vespalib::stringref state_str, uint16_t cc_index) {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state_str));
+ cmd->setSourceIndex(cc_index);
+ return cmd;
+ }
+
+ void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out);
+ void force_current_cluster_state_version(uint32_t version, uint16_t cc_index);
+ void force_current_cluster_state_version(uint32_t version) {
+ force_current_cluster_state_version(version, 0);
+ }
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -86,11 +97,30 @@ StateManagerTest::TearDown() {
}
void
-StateManagerTest::force_current_cluster_state_version(uint32_t version)
+StateManagerTest::get_single_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_EQ(_upper->getNumReplies(), 1);
+ ASSERT_TRUE(_upper->getReply(0)->getType().isReply());
+ reply_out = std::dynamic_pointer_cast<api::StorageReply>(_upper->getReply(0));
+ ASSERT_TRUE(reply_out.get() != nullptr);
+ _upper->reset();
+}
+
+void
+StateManagerTest::get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out)
+{
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply_out));
+ ASSERT_EQ(reply_out->getResult(), api::ReturnCode(api::ReturnCode::OK));
+}
+
+void
+StateManagerTest::force_current_cluster_state_version(uint32_t version, uint16_t cc_index)
{
ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
state.setVersion(version);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ const auto maybe_rejected_by_ver = _manager->try_set_cluster_state_bundle(
+ std::make_shared<const lib::ClusterStateBundle>(state), cc_index);
+ ASSERT_EQ(maybe_rejected_by_ver, std::nullopt);
}
void
@@ -118,23 +148,12 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version
version_out = clusterStateVersionCursor.asLong();
}
-#define GET_ONLY_OK_REPLY(varname) \
-{ \
- ASSERT_EQ(size_t(1), _upper->getNumReplies()); \
- ASSERT_TRUE(_upper->getReply(0)->getType().isReply()); \
- varname = std::dynamic_pointer_cast<api::StorageReply>( \
- _upper->getReply(0)); \
- ASSERT_TRUE(varname.get() != nullptr); \
- _upper->reset(); \
- ASSERT_EQ(api::ReturnCode(api::ReturnCode::OK), \
- varname->getResult()); \
-}
-
TEST_F(StateManagerTest, cluster_state) {
std::shared_ptr<api::StorageReply> reply;
// Verify initial state on startup
auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ("cluster:d", currentState->toString(false));
+ EXPECT_EQ(currentState->getVersion(), 0);
auto currentNodeState = _manager->getCurrentNodeState();
EXPECT_EQ("s:d", currentNodeState->toString(false));
@@ -142,7 +161,7 @@ TEST_F(StateManagerTest, cluster_state) {
ClusterState sendState("storage:4 .2.s:m");
auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState);
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
EXPECT_EQ(sendState, *currentState);
@@ -151,13 +170,64 @@ TEST_F(StateManagerTest, cluster_state) {
EXPECT_EQ("s:m", currentNodeState->toString(false));
}
+TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(false);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 122);
+}
+
+TEST_F(StateManagerTest, reject_lower_state_versions_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ _upper->sendDown(make_set_state_cmd("version:122 distributor:1 storage:1", 0)); // CC 0
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_single_reply(reply));
+ api::ReturnCode expected_res(api::ReturnCode::REJECTED, "Cluster state version 122 rejected; node already has "
+ "a higher cluster state version (123)");
+ EXPECT_EQ(reply->getResult(), expected_res);
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+}
+
+// Observing a lower cluster state version from the same CC index directly implies that the ZooKeeper
+// state has been lost, at which point we pragmatically (but begrudgingly) accept the state version
+// to avoid stalling the entire cluster for an indeterminate amount of time.
+TEST_F(StateManagerTest, accept_lower_state_versions_from_same_cc_index_even_if_strict_requirement_enabled) {
+ _manager->set_require_strictly_increasing_cluster_state_versions(true);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123, 1)); // CC 1
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 123);
+
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(124, 2)); // CC 2
+ ASSERT_EQ(_manager->getClusterStateBundle()->getVersion(), 124);
+
+ // CC 1 restarts from scratch with previous ZK state up in smoke.
+ _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:1", 1));
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 3);
+
+ // CC 2 restarts and continues from where CC 1 left off.
+ _upper->sendDown(make_set_state_cmd("version:4 distributor:1 storage:1", 2));
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ EXPECT_EQ(_manager->getClusterStateBundle()->getVersion(), 4);
+}
+
namespace {
struct MyStateListener : public StateListener {
const NodeStateUpdater& updater;
lib::NodeState current;
std::ostringstream ost;
- MyStateListener(const NodeStateUpdater& upd);
+ explicit MyStateListener(const NodeStateUpdater& upd);
~MyStateListener() override;
void handleNewState() noexcept override {
@@ -194,7 +264,7 @@ TEST_F(StateManagerTest, reported_node_state) {
// And get node state command (no expected state)
auto cmd = std::make_shared<api::GetNodeStateCommand>(lib::NodeState::UP());
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_shared<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -203,7 +273,7 @@ TEST_F(StateManagerTest, reported_node_state) {
cmd = std::make_shared<api::GetNodeStateCommand>(
std::make_unique<NodeState>(NodeType::STORAGE, State::INITIALIZING));
_upper->sendDown(cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -222,7 +292,7 @@ TEST_F(StateManagerTest, reported_node_state) {
_manager->setReportedNodeState(ns);
}
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
nodeState = std::make_unique<NodeState>(
dynamic_cast<api::GetNodeStateReply&>(*reply).getNodeState());
@@ -244,7 +314,7 @@ TEST_F(StateManagerTest, reported_node_state) {
}
TEST_F(StateManagerTest, current_cluster_state_version_is_included_in_host_info_json) {
- force_current_cluster_state_version(123);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(123));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 123);
@@ -266,7 +336,7 @@ void StateManagerTest::send_down_get_node_state_request(uint16_t controller_inde
void StateManagerTest::assert_ok_get_node_state_reply_sent_and_clear() {
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::GETNODESTATE_REPLY, reply->getType());
}
@@ -344,7 +414,7 @@ TEST_F(StateManagerTest, request_almost_immediate_replies_triggers_fast_reply)
}
TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_state_version) {
- force_current_cluster_state_version(12345);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(12345));
auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
cmd->setTimeout(10000000ms);
@@ -353,7 +423,7 @@ TEST_F(StateManagerTest, activation_command_is_bounced_with_current_cluster_stat
ASSERT_EQ(1, _upper->getNumReplies());
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); // Implicitly clears messages from _upper
ASSERT_EQ(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
EXPECT_EQ(12340, activate_reply.activateVersion());
@@ -366,7 +436,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
cmd->setSourceIndex(0);
_upper->sendDown(cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -374,7 +444,7 @@ TEST_F(StateManagerTest, non_deferred_cluster_state_sets_reported_cluster_state_
}
TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_activation_edge) {
- force_current_cluster_state_version(100);
+ ASSERT_NO_FATAL_FAILURE(force_current_cluster_state_version(100));
lib::ClusterStateBundle deferred_bundle(lib::ClusterState("version:101 distributor:1 storage:1"), {}, true);
auto state_cmd = std::make_shared<api::SetSystemStateCommand>(deferred_bundle);
@@ -382,7 +452,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
state_cmd->setSourceIndex(0);
_upper->sendDown(state_cmd);
std::shared_ptr<api::StorageReply> reply;
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
uint32_t version;
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
@@ -392,7 +462,7 @@ TEST_F(StateManagerTest, deferred_cluster_state_does_not_update_state_until_acti
activation_cmd->setTimeout(1000s);
activation_cmd->setSourceIndex(0);
_upper->sendDown(activation_cmd);
- GET_ONLY_OK_REPLY(reply);
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
ASSERT_NO_FATAL_FAILURE(extract_cluster_state_version_from_host_info(version));
EXPECT_EQ(version, 101);