summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/top_level_distributor_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/top_level_distributor_test.cpp')
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp141
1 files changed, 139 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index 4968e8e6be3..a8ef6d990f0 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -90,6 +90,10 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return _distributor->_threadPool;
}
+ DistributorHostInfoReporter& distributor_host_info_reporter() {
+ return _distributor->_hostInfoReporter;
+ }
+
const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() {
return _distributor->_status_to_do;
}
@@ -98,6 +102,10 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return _distributor->_metricUpdateHook;
}
+ BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() {
+ return _distributor->getBucketSpacesStats();
+ }
+
uint64_t db_sample_interval_sec() const noexcept {
// Sampling interval is equal across stripes, so just grab the first one and go with it.
return std::chrono::duration_cast<std::chrono::seconds>(
@@ -112,6 +120,28 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return _node->getNodeStateUpdater().explicit_node_state_reply_send_invocations();
}
+ std::shared_ptr<api::RemoveCommand> make_dummy_remove_command() {
+ return std::make_shared<api::RemoveCommand>(
+ makeDocumentBucket(document::BucketId(0)),
+ document::DocumentId("id:foo:testdoctype1:n=1:foo"),
+ api::Timestamp(0));
+ }
+
+ void assert_single_reply_present_with_return_code(api::ReturnCode::Result expected_result) {
+ ASSERT_THAT(_sender.replies(), SizeIs(1)); // Single remove reply
+ ASSERT_EQ(_sender.reply(0)->getType(), api::MessageType::REMOVE_REPLY);
+ auto& reply(static_cast<api::RemoveReply&>(*_sender.reply(0)));
+ ASSERT_EQ(reply.getResult().getResult(), expected_result);
+ _sender.replies().clear();
+ }
+
+ void assert_single_bounced_remove_reply_present() {
+ assert_single_reply_present_with_return_code(api::ReturnCode::STALE_TIMESTAMP);
+ }
+
+ void assert_single_ok_remove_reply_present() {
+ assert_single_reply_present_with_return_code(api::ReturnCode::OK);
+ }
};
TopLevelDistributorTest::TopLevelDistributorTest()
@@ -141,8 +171,7 @@ TEST_F(TopLevelDistributorTest, external_operation_is_routed_to_expected_stripe)
}
TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_triggered_across_all_stripes) {
- setup_distributor(Redundancy(1), NodeCount(2),
- "storage:1 .0.s:d distributor:1");
+ setup_distributor(Redundancy(1), NodeCount(2), "storage:1 .0.s:d distributor:1");
enable_distributor_cluster_state("storage:1 distributor:1");
EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
@@ -432,4 +461,112 @@ TEST_F(TopLevelDistributorTest, non_bootstrap_host_info_send_request_delays_send
EXPECT_EQ(2, explicit_node_state_reply_send_invocations());
}
+TEST_F(TopLevelDistributorTest, host_info_reporter_config_is_propagated_to_reporter) {
+ setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1");
+
+ // Default is enabled=true.
+ EXPECT_TRUE(distributor_host_info_reporter().isReportingEnabled());
+
+ auto cfg = current_distributor_config();
+ cfg.enableHostInfoReporting = false;
+ reconfigure(cfg);
+
+ EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled());
+}
+
+namespace {
+
+void assert_invalid_stats_for_all_spaces(
+ const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& stats,
+ uint16_t node_index)
+{
+ auto stats_iter = stats.find(node_index);
+ ASSERT_TRUE(stats_iter != stats.cend());
+ ASSERT_EQ(2, stats_iter->second.size());
+ auto space_iter = stats_iter->second.find(document::FixedBucketSpaces::default_space_name());
+ ASSERT_TRUE(space_iter != stats_iter->second.cend());
+ ASSERT_FALSE(space_iter->second.valid());
+ space_iter = stats_iter->second.find(document::FixedBucketSpaces::global_space_name());
+ ASSERT_TRUE(space_iter != stats_iter->second.cend());
+ ASSERT_FALSE(space_iter->second.valid());
+}
+
+}
+
+TEST_F(TopLevelDistributorTest, entering_recovery_mode_resets_bucket_space_stats_across_all_stripes) {
+ // Set up a cluster state + DB contents which implies merge maintenance ops
+ setup_distributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=2/2/2/t/a");
+
+ tick_distributor_and_stripes_n_times(5); // Make sure all stripes have had ample time to update their stats
+
+ enable_distributor_cluster_state("version:2 distributor:1 storage:3 .1.s:d");
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ // Bucket space stats should now be invalid per space per node, pending stats
+ // from state version 2. Exposing stats from version 1 risks reporting stale
+ // information back to the cluster controller.
+ const auto stats = distributor_bucket_spaces_stats();
+ ASSERT_EQ(2, stats.size());
+
+ assert_invalid_stats_for_all_spaces(stats, 0);
+ assert_invalid_stats_for_all_spaces(stats, 2);
+}
+
+TEST_F(TopLevelDistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) {
+ setup_distributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2");
+ fake_clock().setAbsoluteTimeInSeconds(1000);
+ // Should not send explicit replies during init stage
+ ASSERT_EQ(0, explicit_node_state_reply_send_invocations());
+ // Add a couple of buckets so we have something to iterate over. 2 buckets
+ // map to the same stripe so we'll need 2 ticks to complete a full scan.
+ ASSERT_EQ(stripe_of_bucket(document::BucketId(16, 1)),
+ stripe_of_bucket(document::BucketId(16, 5)));
+
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a");
+ add_nodes_to_stripe_bucket_db(document::BucketId(16, 5), "0=1/1/1/t/a");
+
+ enable_distributor_cluster_state("version:2 distributor:1 storage:3 .1.s:d");
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
+ tick_distributor_and_stripes_n_times(1); // DB round not yet complete
+ EXPECT_EQ(0, explicit_node_state_reply_send_invocations());
+ tick_distributor_and_stripes_n_times(4); // DB round complete on all stripes
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+ // Now out of recovery mode, subsequent round completions should not send replies
+ tick_distributor_and_stripes_n_times(10);
+ EXPECT_EQ(1, explicit_node_state_reply_send_invocations());
+}
+
+// TODO refactor this to set proper highest timestamp as part of bucket info
+// reply once we have the "highest timestamp across all owned buckets" feature
+// in place.
+TEST_F(TopLevelDistributorTest, configured_safe_time_point_rejection_works_end_to_end) {
+ setup_distributor(Redundancy(2), NodeCount(2), "storage:1 distributor:2");
+ fake_clock().setAbsoluteTimeInSeconds(1000);
+
+ auto cfg = current_distributor_config();
+ cfg.maxClusterClockSkewSec = 10;
+ reconfigure(cfg);
+
+ // State with changed bucket ownership; should enforce safe mutation time points
+ enable_distributor_cluster_state("storage:1 distributor:1", true);
+
+ handle_top_level_message(make_dummy_remove_command());
+ tick_distributor_and_stripes_n_times(1); // Process queued message
+ ASSERT_NO_FATAL_FAILURE(assert_single_bounced_remove_reply_present());
+
+ // Increment time to first whole second of clock + 10 seconds of skew.
+ // Should now not get any feed rejections.
+ fake_clock().setAbsoluteTimeInSeconds(1011);
+
+ handle_top_level_message(make_dummy_remove_command());
+ tick_distributor_and_stripes_n_times(1); // Process queued message
+ // We don't have any buckets in our DB so we'll get an OK remove reply back (nothing to remove!)
+ ASSERT_NO_FATAL_FAILURE(assert_single_ok_remove_reply_present());
+}
+
}