aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-15 16:05:22 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-15 16:05:22 +0000
commit00804dfe908b6a337f88c42ea3def0e1f1397474 (patch)
treee4e58c7a54e830fc5a237fe0be945e1f62f40492 /storage/src/tests
parent1b959abd3224c00f0347a8078dc333abfdd3ce9f (diff)
Properly handle non-owned vs. missing buckets
Bonus: no more spurious "we have removed buckets" log messages caused by ownership changes. Also ensure that we BUSY-bounce operations in `ExternalOperationHandler` when there is no actual state to send back in a `WrongDistributionReply`.
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp146
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp54
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp30
3 files changed, 199 insertions, 31 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index fd9512b2bd6..0a82fadbf68 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -114,19 +114,12 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture,
CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection);
CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change);
+ CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database);
CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
+ CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received);
+ CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated);
CPPUNIT_TEST_SUITE_END();
- /*
- * TODO tests
- * - buckets moved to read only db on ownership change
- * - even when self is down in pending state
- * - buckets NOT moved to read only db on content node down/maintenance
- * - read only db cleared when cluster state activated
- * - explicit cluster state activation path
- * - legacy implicit cluster state activation support
- */
-
public:
BucketDBUpdaterTest();
@@ -188,7 +181,10 @@ protected:
void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted();
void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection();
void non_owned_buckets_moved_to_read_only_db_on_ownership_change();
+ void buckets_no_longer_available_are_not_moved_to_read_only_database();
void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
+ void deferred_activated_state_does_not_enable_state_until_activation_received();
+ void read_only_db_cleared_once_pending_state_is_activated();
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
@@ -215,6 +211,19 @@ protected:
// cluster state component (just not by operations), so it would not have the expected semantics.
DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); }
+ BucketDatabase& mutable_default_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& mutable_global_db() noexcept {
+ return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_default_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ BucketDatabase& read_only_global_db() noexcept {
+ return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+
static std::string getNodeList(std::vector<uint16_t> nodes, size_t count);
std::string getNodeList(std::vector<uint16_t> nodes);
@@ -228,6 +237,10 @@ protected:
return messagesPerBucketSpace * _bucketSpaces.size();
}
+ void trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs);
+
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
void setUp() override {
@@ -391,8 +404,7 @@ public:
void setSystemState(const lib::ClusterState& state) {
const size_t sizeBeforeState = _sender.commands.size();
getBucketDBUpdater().onSetSystemState(
- std::shared_ptr<api::SetSystemStateCommand>(
- new api::SetSystemStateCommand(state)));
+ std::make_shared<api::SetSystemStateCommand>(state));
// A lot of test logic has the assumption that all messages sent as a
// result of cluster state changes will be in increasing index order
// (for simplicity, not because this is required for correctness).
@@ -401,6 +413,18 @@ public:
sortSentMessagesByIndex(_sender, sizeBeforeState);
}
+ void setClusterStateBundle(const lib::ClusterStateBundle& state) {
+ const size_t sizeBeforeState = _sender.commands.size();
+ getBucketDBUpdater().onSetSystemState(
+ std::make_shared<api::SetSystemStateCommand>(state));
+ sortSentMessagesByIndex(_sender, sizeBeforeState);
+ }
+
+ void activateClusterStateVersion(uint32_t version) {
+ getBucketDBUpdater().onActivateClusterStateVersion(
+ std::make_shared<api::ActivateClusterStateVersionCommand>(version));
+ }
+
void completeBucketInfoGathering(const lib::ClusterState& state,
size_t expectedMsgs,
uint32_t bucketCount = 1,
@@ -2640,7 +2664,7 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
- setSystemState(initial_state);
+ setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
@@ -2665,9 +2689,9 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
});
CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
- setSystemState(pending_state);
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
- CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size());
+ CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size());
CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size());
// TODO replace with gmock unordered set equality matcher
@@ -2679,6 +2703,26 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
// TODO check global space too!
}
+// TODO dedupe setup stuff
+void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() {
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ constexpr uint32_t n_buckets = 10;
+ completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ _sender.clear();
+
+ // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
+ // cause some buckets to be entirely unavailable.
+ lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m");
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+}
+
+// TODO must ensure this works correctly with cluster controller!
void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
@@ -2690,13 +2734,79 @@ void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_con
completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
_sender.clear();
- auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase();
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
setSystemState(pending_state);
// No buckets should be moved into read only db
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size());
+ CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
+}
+
+void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs)
+{
+ getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ lib::ClusterState initial_state(initial_state_str);
+ setSystemState(initial_state);
+ // TODO decouple expected message count
+ CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets);
+ _sender.clear();
+
+ lib::ClusterState pending_state(pending_state_str); // Ownership change
+ setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true));
+ CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size());
+ completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets);
+ _sender.clear();
+}
+
+void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4);
+
+ // Version should not be switched over yet
+ CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+
+ activateClusterStateVersion(2);
+
+ CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size());
}
+void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() {
+ constexpr uint32_t n_buckets = 10;
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0);
+ activateClusterStateVersion(2);
+
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size());
+ CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size());
+}
+
+/*
+ * TODO tests
+ * - [X] buckets moved to read only db on ownership change
+ * - even when self is down in pending state
+ * - [X] buckets NOT moved to read only db on content node down/maintenance
+ * - [X] read only db cleared when cluster state activated
+ * - explicit cluster state activation path
+ * - legacy implicit cluster state activation support
+ * - return activation ACK if already activated
+ * - buckets merged on explicit activation
+ * - version check for activation msg
+ * - deferred bundle with non-deferred config?
+ */
+
+// TODO reconcile default deferred activation state of bundle between Java and C++!
+
}
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index ebf9d4e3bc8..76917f856f0 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -23,7 +23,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
CPPUNIT_TEST(testBucketSplitMask);
CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution);
CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution);
+ CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet);
CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution);
+ CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet);
CPPUNIT_TEST(reject_put_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point);
CPPUNIT_TEST(reject_update_if_not_past_safe_time_point);
@@ -59,6 +61,8 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
const vespalib::string& id) const;
std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const;
+ void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd);
+
Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd);
void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd);
@@ -92,8 +96,10 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture,
protected:
void testBucketSplitMask();
void mutating_operation_wdr_bounced_on_wrong_current_distribution();
- void read_only_operation_wdr_bounced_on_wrong_current_distribution();
void mutating_operation_busy_bounced_on_wrong_pending_distribution();
+ void mutating_operation_busy_bounced_if_no_cluster_state_received_yet();
+ void read_only_operation_wdr_bounced_on_wrong_current_distribution();
+ void read_only_operation_busy_bounced_if_no_cluster_state_received_yet();
void reject_put_if_not_past_safe_time_point();
void reject_remove_if_not_past_safe_time_point();
void reject_update_if_not_past_safe_time_point();
@@ -254,7 +260,7 @@ void
ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
- std::string state("distributor:2 storage:2");
+ std::string state("version:1 distributor:2 storage:2");
setupDistributor(1, 2, state);
document::BucketId bucket(findNonOwnedUserBucketInState(state));
@@ -266,7 +272,7 @@ ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_di
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
CPPUNIT_ASSERT_EQUAL(
std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:2 storage:2)"),
+ "version:1 distributor:2 storage:2)"),
_sender.replies[0]->getResult().toString());
}
@@ -274,7 +280,7 @@ void
ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution()
{
createLinks();
- std::string state("distributor:2 storage:2");
+ std::string state("version:1 distributor:2 storage:2");
setupDistributor(1, 2, state);
document::BucketId bucket(findNonOwnedUserBucketInState(state));
@@ -286,7 +292,7 @@ ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_d
CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
CPPUNIT_ASSERT_EQUAL(
std::string("ReturnCode(WRONG_DISTRIBUTION, "
- "distributor:2 storage:2)"),
+ "version:1 distributor:2 storage:2)"),
_sender.replies[0]->getResult().toString());
}
@@ -315,6 +321,34 @@ ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_d
_sender.replies[0]->getResult().toString());
}
+void
+ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd)
+{
+ createLinks();
+ std::string state{}; // No version --> not yet received
+ setupDistributor(1, 2, state);
+
+ Operation::SP genOp;
+ CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp));
+ CPPUNIT_ASSERT(!genOp.get());
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ CPPUNIT_ASSERT_EQUAL(
+ std::string("ReturnCode(BUSY, No cluster state activated yet)"),
+ _sender.replies[0]->getResult().toString());
+}
+
+void
+ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345));
+}
+
+void
+ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet()
+{
+ verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345));
+}
+
using TimePoint = ExternalOperationHandler::TimePoint;
using namespace std::literals::chrono_literals;
@@ -322,7 +356,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time(
std::shared_ptr<api::StorageCommand> cmd)
{
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -357,7 +391,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() {
void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(9);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -372,7 +406,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() {
void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
getClock().setAbsoluteTimeInSeconds(10);
getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s));
@@ -390,7 +424,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached
void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() {
createLinks();
- setupDistributor(1, 2, "distributor:1 storage:1");
+ setupDistributor(1, 2, "version:1 distributor:1 storage:1");
}
Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected(
@@ -518,7 +552,7 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled(
void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() {
createLinks();
- std::string current = "distributor:1 storage:3";
+ std::string current = "version:1 distributor:1 storage:3";
setupDistributor(1, 3, current);
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index 19f414482db..cdf990fa28f 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture {
void can_explicitly_send_get_node_state_reply();
void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request();
void immediate_node_state_replying_is_tracked_per_controller();
+ void activation_command_is_bounced_with_current_cluster_state_version();
CPPUNIT_TEST_SUITE(StateManagerTest);
CPPUNIT_TEST(testSystemState);
@@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(can_explicitly_send_get_node_state_reply);
CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request);
CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller);
+ CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version);
CPPUNIT_TEST_SUITE_END();
+ void force_current_cluster_state_version(uint32_t version);
void mark_reported_node_state_up();
void send_down_get_node_state_request(uint16_t controller_index);
void assert_ok_get_node_state_reply_sent_and_clear();
@@ -101,6 +104,12 @@ StateManagerTest::tearDown() {
_metricManager.reset();
}
+void StateManagerTest::force_current_cluster_state_version(uint32_t version) {
+ ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
+ state.setVersion(version);
+ _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+}
+
#define GET_ONLY_OK_REPLY(varname) \
{ \
CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \
@@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState()
}
void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() {
- ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState());
- state.setVersion(123);
- _manager->setClusterStateBundle(lib::ClusterStateBundle(state));
+ force_current_cluster_state_version(123);
std::string nodeInfoString(_manager->getNodeInfo());
vespalib::Memory goldenMemory(nodeInfoString);
@@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller()
CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies());
}
+void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() {
+ force_current_cluster_state_version(12345);
+
+ auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340);
+ cmd->setTimeout(10000000);
+ cmd->setSourceIndex(0);
+ _upper->sendDown(cmd);
+
+ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies());
+ std::shared_ptr<api::StorageReply> reply;
+ GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper
+ CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType());
+ auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply);
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion());
+ CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion());
+}
+
} // storage