diff options
Diffstat (limited to 'storage/src/tests/distributor/top_level_distributor_test.cpp')
-rw-r--r-- | storage/src/tests/distributor/top_level_distributor_test.cpp | 141 |
1 files changed, 139 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 4968e8e6be3..a8ef6d990f0 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -90,6 +90,10 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return _distributor->_threadPool; } + DistributorHostInfoReporter& distributor_host_info_reporter() { + return _distributor->_hostInfoReporter; + } + const std::vector<std::shared_ptr<DistributorStatus>>& distributor_status_todos() { return _distributor->_status_to_do; } @@ -98,6 +102,10 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return _distributor->_metricUpdateHook; } + BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() { + return _distributor->getBucketSpacesStats(); + } + uint64_t db_sample_interval_sec() const noexcept { // Sampling interval is equal across stripes, so just grab the first one and go with it. return std::chrono::duration_cast<std::chrono::seconds>( @@ -112,6 +120,28 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return _node->getNodeStateUpdater().explicit_node_state_reply_send_invocations(); } + std::shared_ptr<api::RemoveCommand> make_dummy_remove_command() { + return std::make_shared<api::RemoveCommand>( + makeDocumentBucket(document::BucketId(0)), + document::DocumentId("id:foo:testdoctype1:n=1:foo"), + api::Timestamp(0)); + } + + void assert_single_reply_present_with_return_code(api::ReturnCode::Result expected_result) { + ASSERT_THAT(_sender.replies(), SizeIs(1)); // Single remove reply + ASSERT_EQ(_sender.reply(0)->getType(), api::MessageType::REMOVE_REPLY); + auto& reply(static_cast<api::RemoveReply&>(*_sender.reply(0))); + ASSERT_EQ(reply.getResult().getResult(), expected_result); + _sender.replies().clear(); + } + + void assert_single_bounced_remove_reply_present() { + assert_single_reply_present_with_return_code(api::ReturnCode::STALE_TIMESTAMP); + } + + void assert_single_ok_remove_reply_present() { + assert_single_reply_present_with_return_code(api::ReturnCode::OK); + } }; TopLevelDistributorTest::TopLevelDistributorTest() @@ -141,8 +171,7 @@ TEST_F(TopLevelDistributorTest, external_operation_is_routed_to_expected_stripe) } TEST_F(TopLevelDistributorTest, recovery_mode_on_cluster_state_change_is_triggered_across_all_stripes) { - setup_distributor(Redundancy(1), NodeCount(2), - "storage:1 .0.s:d distributor:1"); + setup_distributor(Redundancy(1), NodeCount(2), "storage:1 .0.s:d distributor:1"); enable_distributor_cluster_state("storage:1 distributor:1"); EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); @@ -432,4 +461,112 @@ TEST_F(TopLevelDistributorTest, non_bootstrap_host_info_send_request_delays_send EXPECT_EQ(2, explicit_node_state_reply_send_invocations()); } +TEST_F(TopLevelDistributorTest, host_info_reporter_config_is_propagated_to_reporter) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:2 distributor:1"); + + // Default is enabled=true. + EXPECT_TRUE(distributor_host_info_reporter().isReportingEnabled()); + + auto cfg = current_distributor_config(); + cfg.enableHostInfoReporting = false; + reconfigure(cfg); + + EXPECT_FALSE(distributor_host_info_reporter().isReportingEnabled()); +} + +namespace { + +void assert_invalid_stats_for_all_spaces( + const BucketSpacesStatsProvider::PerNodeBucketSpacesStats& stats, + uint16_t node_index) +{ + auto stats_iter = stats.find(node_index); + ASSERT_TRUE(stats_iter != stats.cend()); + ASSERT_EQ(2, stats_iter->second.size()); + auto space_iter = stats_iter->second.find(document::FixedBucketSpaces::default_space_name()); + ASSERT_TRUE(space_iter != stats_iter->second.cend()); + ASSERT_FALSE(space_iter->second.valid()); + space_iter = stats_iter->second.find(document::FixedBucketSpaces::global_space_name()); + ASSERT_TRUE(space_iter != stats_iter->second.cend()); + ASSERT_FALSE(space_iter->second.valid()); +} + +} + +TEST_F(TopLevelDistributorTest, entering_recovery_mode_resets_bucket_space_stats_across_all_stripes) { + // Set up a cluster state + DB contents which implies merge maintenance ops + setup_distributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 3), "0=2/2/2/t/a"); + + tick_distributor_and_stripes_n_times(5); // Make sure all stripes have had ample time to update their stats + + enable_distributor_cluster_state("version:2 distributor:1 storage:3 .1.s:d"); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + // Bucket space stats should now be invalid per space per node, pending stats + // from state version 2. Exposing stats from version 1 risks reporting stale + // information back to the cluster controller. + const auto stats = distributor_bucket_spaces_stats(); + ASSERT_EQ(2, stats.size()); + + assert_invalid_stats_for_all_spaces(stats, 0); + assert_invalid_stats_for_all_spaces(stats, 2); +} + +TEST_F(TopLevelDistributorTest, leaving_recovery_mode_immediately_sends_getnodestate_replies) { + setup_distributor(Redundancy(2), NodeCount(2), "version:1 distributor:1 storage:2"); + fake_clock().setAbsoluteTimeInSeconds(1000); + // Should not send explicit replies during init stage + ASSERT_EQ(0, explicit_node_state_reply_send_invocations()); + // Add a couple of buckets so we have something to iterate over. 2 buckets + // map to the same stripe so we'll need 2 ticks to complete a full scan. + ASSERT_EQ(stripe_of_bucket(document::BucketId(16, 1)), + stripe_of_bucket(document::BucketId(16, 5))); + + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a"); + add_nodes_to_stripe_bucket_db(document::BucketId(16, 5), "0=1/1/1/t/a"); + + enable_distributor_cluster_state("version:2 distributor:1 storage:3 .1.s:d"); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); + tick_distributor_and_stripes_n_times(1); // DB round not yet complete + EXPECT_EQ(0, explicit_node_state_reply_send_invocations()); + tick_distributor_and_stripes_n_times(4); // DB round complete on all stripes + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + // Now out of recovery mode, subsequent round completions should not send replies + tick_distributor_and_stripes_n_times(10); + EXPECT_EQ(1, explicit_node_state_reply_send_invocations()); +} + +// TODO refactor this to set proper highest timestamp as part of bucket info +// reply once we have the "highest timestamp across all owned buckets" feature +// in place. +TEST_F(TopLevelDistributorTest, configured_safe_time_point_rejection_works_end_to_end) { + setup_distributor(Redundancy(2), NodeCount(2), "storage:1 distributor:2"); + fake_clock().setAbsoluteTimeInSeconds(1000); + + auto cfg = current_distributor_config(); + cfg.maxClusterClockSkewSec = 10; + reconfigure(cfg); + + // State with changed bucket ownership; should enforce safe mutation time points + enable_distributor_cluster_state("storage:1 distributor:1", true); + + handle_top_level_message(make_dummy_remove_command()); + tick_distributor_and_stripes_n_times(1); // Process queued message + ASSERT_NO_FATAL_FAILURE(assert_single_bounced_remove_reply_present()); + + // Increment time to first whole second of clock + 10 seconds of skew. + // Should now not get any feed rejections. + fake_clock().setAbsoluteTimeInSeconds(1011); + + handle_top_level_message(make_dummy_remove_command()); + tick_distributor_and_stripes_n_times(1); // Process queued message + // We don't have any buckets in our DB so we'll get an OK remove reply back (nothing to remove!) + ASSERT_NO_FATAL_FAILURE(assert_single_ok_remove_reply_present()); +} + } |