diff options
Diffstat (limited to 'storage/src/tests/distributor')
5 files changed, 1893 insertions, 56 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp index 6cca6df9f80..b871bf5841e 100644 --- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp @@ -1212,6 +1212,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket } } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, merge_reply) { enableDistributorClusterState("distributor:1 storage:3"); @@ -1254,6 +1255,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply) { dumpBucket(document::BucketId(16, 1234))); }; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) { enableDistributorClusterState("distributor:1 storage:3"); std::vector<api::MergeBucketCommand::Node> nodes; @@ -1296,6 +1298,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) { dumpBucket(document::BucketId(16, 1234))); }; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { enableDistributorClusterState("distributor:1 storage:3"); std::vector<api::MergeBucketCommand::Node> nodes; @@ -1338,7 +1341,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { dumpBucket(document::BucketId(16, 1234))); }; - +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, flush) { enableDistributorClusterState("distributor:1 storage:3"); _sender.clear(); @@ -1417,6 +1420,7 @@ LegacyBucketDBUpdaterTest::getSentNodesDistributionChanged( return ost.str(); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) { EXPECT_EQ(getNodeList({0, 1, 2}), getSentNodes("cluster:d", @@ -1514,6 +1518,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) { "distributor:3 storage:3 .1.s:m")); }; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) { DistributorMessageSenderStub sender; @@ -1552,6 +1557,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) { EXPECT_EQ(3, (int)pendingTransition.results().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) { std::string config(getDistConfig6Nodes4Groups()); config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; @@ -1571,6 +1577,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) { "distributor:6 .2.s:d storage:6")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) { std::string config(getDistConfig6Nodes4Groups()); config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; @@ -1582,6 +1589,8 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_h "distributor:6 .2.s:d .3.s:d storage:6")); } +namespace { + void parseInputData(const std::string& data, uint64_t timestamp, @@ -1656,6 +1665,8 @@ struct BucketDumper : public BucketDatabase::EntryProcessor } }; +} + std::string LegacyBucketDBUpdaterTest::mergeBucketLists( const lib::ClusterState& oldState, @@ -1724,6 +1735,7 @@ LegacyBucketDBUpdaterTest::mergeBucketLists(const std::string& existingData, includeBucketInfo); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) { // Simple initializing case - ask all nodes for info EXPECT_EQ( @@ -1763,6 +1775,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) { mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) { // Node went from initializing to up and non-invalid bucket changed. EXPECT_EQ( @@ -1775,6 +1788,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) { true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) { document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); @@ -1804,6 +1818,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_cur EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); @@ -1831,6 +1846,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pen EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /* * If we get a distribution config change, it's important that cluster states that * arrive after this--but _before_ the pending cluster state has finished--must trigger @@ -1880,6 +1896,7 @@ TEST_F(LegacyBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_dis EXPECT_EQ(size_t(0), _sender.commands().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); @@ -1929,6 +1946,7 @@ std::unique_ptr<BucketDatabase::EntryProcessor> func_processor(Func&& f) { } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) { setDistribution(getDistConfig3Nodes1Group()); @@ -1948,6 +1966,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_buc })); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) { getClock().setAbsoluteTimeInSeconds(101234); lib::ClusterState stateBefore("distributor:1 storage:1"); @@ -1963,6 +1982,7 @@ TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_ti EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) { { lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); @@ -2051,6 +2071,7 @@ LegacyBucketDBUpdaterTest::getSentNodesWithPreemption( using nodeVec = std::vector<uint16_t>; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /* * If we don't carry over the set of nodes that we need to fetch from, * a naive comparison between the active state and the new state will @@ -2067,6 +2088,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_ "version:3 distributor:6 storage:6")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) { EXPECT_EQ( expandNodeVec({2, 3}), @@ -2077,6 +2099,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over "version:3 distributor:6 storage:6")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) { EXPECT_EQ( expandNodeVec({2}), @@ -2087,6 +2110,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched "version:3 distributor:6 storage:6")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) { EXPECT_EQ( nodeVec{}, @@ -2097,6 +2121,7 @@ TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_stat "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) { // Even though 100 nodes are preempted, not all of these should be part // of the request afterwards when only 6 are part of the state. @@ -2109,6 +2134,7 @@ TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) { "version:3 distributor:6 storage:6")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) { lib::ClusterState stateBefore( "version:1 distributor:6 storage:6 .1.t:1234"); @@ -2123,6 +2149,7 @@ TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_sta EXPECT_EQ(size_t(0), _sender.commands().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest (despite being disabled) // XXX test currently disabled since distribution config currently isn't used // at all in order to deduce the set of nodes to send to. This might not matter // in practice since it is assumed that the cluster state matching the new @@ -2144,6 +2171,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /** * Test scenario where a cluster is downsized by removing a subset of the nodes * from the distribution configuration. The system must be able to deal with @@ -2188,6 +2216,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing EXPECT_EQ(expandNodeVec({0, 1}), getSendSet()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:1 storage:2"); @@ -2198,6 +2227,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_tran EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:2 storage:1"); @@ -2208,18 +2238,21 @@ TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForDistributionChange( "distributor:2 storage:2"); EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1))); @@ -2227,6 +2260,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_st EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { lib::ClusterState state("distributor:2 storage:2"); ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1)); @@ -2239,6 +2273,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_confi EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { _sender.clear(); lib::ClusterState state("distributor:2 storage:2"); @@ -2252,6 +2287,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_trans EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /* * Brief reminder on test DSL for checking bucket merge operations: * @@ -2275,31 +2311,37 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_do "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { EXPECT_EQ(std::string("5:0/1/2/3/t|"), mergeBucketLists("", "0:5/1/2/3", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { EXPECT_EQ(std::string("5:0/1/2/3/t|"), mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"), mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted // in that _all_ content nodes are considered outdated when distributor changes take place, @@ -2315,6 +2357,7 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_ "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest // TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 TEST_F(LegacyBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { std::string distConfig(getDistConfig6Nodes2Groups()); @@ -2384,6 +2427,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { getBucketDBUpdater().set_stale_reads_enabled(true); @@ -2425,6 +2469,7 @@ TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_own }); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { constexpr uint32_t n_buckets = 10; // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will @@ -2436,6 +2481,7 @@ TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_r EXPECT_EQ(size_t(0), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { getBucketDBUpdater().set_stale_reads_enabled(false); @@ -2481,6 +2527,7 @@ void LegacyBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transiti _sender.clear(); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2501,6 +2548,7 @@ TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2513,6 +2561,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_act EXPECT_EQ(uint64_t(0), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2527,6 +2576,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_ma EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2541,6 +2591,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatchin ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2557,6 +2608,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending EXPECT_EQ(size_t(0), _sender.replies().size()); } +// TODO STRIPE disabled benchmark tests are NOT migrated to new test suite TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { // Need to trigger an initial edge to complete first bucket scan ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), @@ -2675,6 +2727,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d"); auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m"); @@ -2700,7 +2753,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_ EXPECT_TRUE(state == nullptr); } -struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { +struct LegacyBucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { lib::ClusterState empty_state; std::shared_ptr<lib::ClusterState> initial_baseline; std::shared_ptr<lib::ClusterState> initial_default; @@ -2708,7 +2761,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { Bucket default_bucket; Bucket global_bucket; - BucketDBUpdaterSnapshotTest() + LegacyBucketDBUpdaterSnapshotTest() : LegacyBucketDBUpdaterTest(), empty_state(), initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), @@ -2719,7 +2772,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) { } - ~BucketDBUpdaterSnapshotTest() override; + ~LegacyBucketDBUpdaterSnapshotTest() override; void SetUp() override { LegacyBucketDBUpdaterTest::SetUp(); @@ -2746,19 +2799,22 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { } }; -BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; +LegacyBucketDBUpdaterSnapshotTest::~LegacyBucketDBUpdaterSnapshotTest() = default; -TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); EXPECT_FALSE(rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); EXPECT_FALSE(rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { set_cluster_state_bundle(initial_bundle); // State currently pending, empty initial state is active @@ -2788,7 +2844,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st EXPECT_FALSE(global_rs.context().has_pending_state_transition()); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2800,7 +2857,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_re n_buckets); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, "version:2 distributor:2 .0.s:d storage:4", 0, 0)); @@ -2810,7 +2868,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bu EXPECT_FALSE(def_rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2821,7 +2880,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_onl n_buckets); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { getBucketDBUpdater().set_stale_reads_enabled(false); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 70e5afaed43..01f7d5a4f0a 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -11,13 +11,13 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/test/make_bucket_space.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/metrics/updatehook.h> #include <vespa/storage/distributor/simpleclusterinformation.h> #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> -#include <vespa/vespalib/util/benchmark_timer.h> #include <sstream> #include <iomanip> @@ -38,6 +38,8 @@ class TopLevelBucketDBUpdaterTest : public Test, public TopLevelDistributorTestUtil { public: + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; + TopLevelBucketDBUpdaterTest(); ~TopLevelBucketDBUpdaterTest() override; @@ -117,7 +119,19 @@ public: invalid_bucket_count)); } - std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { + void send_fake_reply_for_single_bucket_request( + const api::RequestBucketInfoCommand& rbi) + { + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + const document::BucketId& bucket(rbi.getBuckets()[0]); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(20, 10, 12, 50, 60, true, true))); + stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply); + } + + std::string verify_bucket(document::BucketId id, const lib::ClusterState& state) { BucketDatabase::Entry entry = get_bucket(id); if (!entry.valid()) { return vespalib::make_string("%s doesn't exist in DB", id.toString().c_str()); @@ -182,6 +196,12 @@ public: sort_sent_messages_by_index(_sender, size_before_state); } + void set_cluster_state_bundle(const lib::ClusterStateBundle& state) { + const size_t size_before_state = _sender.commands().size(); + bucket_db_updater().onSetSystemState(std::make_shared<api::SetSystemStateCommand>(state)); + sort_sent_messages_by_index(_sender, size_before_state); + } + void set_cluster_state(const vespalib::string& state_str) { set_cluster_state(lib::ClusterState(state_str)); } @@ -191,6 +211,14 @@ public: std::make_shared<api::ActivateClusterStateVersionCommand>(version)); } + void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { + ASSERT_EQ(size_t(1), _sender.replies().size()); + auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies().back().get()); + ASSERT_TRUE(response != nullptr); + ASSERT_EQ(version, response->actualVersion()); + _sender.clear(); + } + void complete_bucket_info_gathering(const lib::ClusterState& state, size_t expected_msgs, uint32_t bucket_count = 1, @@ -290,6 +318,176 @@ public: ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets)); } + void complete_state_transition_in_seconds(const std::string& stateStr, + uint32_t seconds, + uint32_t expectedMsgs) + { + _sender.clear(); + lib::ClusterState state(stateStr); + set_cluster_state(state); + fake_clock().addSecondsToTime(seconds); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expectedMsgs)); + } + + uint64_t last_transition_time_in_millis() { + { + // Force stripe metrics to be aggregated into total. + std::mutex l; + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + } + return uint64_t(total_distributor_metrics().stateTransitionTime.getLast()); + } + + ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) { + lib::ClusterState baseline_cluster_state(clusterStateString); + lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state); + auto cluster_info = std::make_shared<SimpleClusterInformation>( + _distributor->node_identity().node_index(), + cluster_state_bundle, + "ui"); + enable_distributor_cluster_state(clusterStateString); + return cluster_info; + } + + struct PendingClusterStateFixture { + DistributorMessageSenderStub sender; + framework::defaultimplementation::FakeClock clock; + std::unique_ptr<PendingClusterState> state; + + PendingClusterStateFixture( + TopLevelBucketDBUpdaterTest& owner, + const std::string& old_cluster_state, + const std::string& new_cluster_state) + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_cluster_state)); + auto cluster_info = owner.create_cluster_info(old_cluster_state); + OutdatedNodesMap outdated_nodes_map; + state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, + owner.top_level_bucket_space_repo(), + cmd, outdated_nodes_map, api::Timestamp(1)); + } + + PendingClusterStateFixture( + TopLevelBucketDBUpdaterTest& owner, + const std::string& old_cluster_state) + { + auto cluster_info = owner.create_cluster_info(old_cluster_state); + state = PendingClusterState::createForDistributionChange( + clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1)); + } + }; + + std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_state_change( + const std::string& oldClusterState, + const std::string& newClusterState) + { + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState); + } + + std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_distribution_change( + const std::string& oldClusterState) + { + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); + } + + std::string get_sent_nodes(const std::string& old_cluster_state, + const std::string& new_cluster_state); + + std::string get_sent_nodes_distribution_changed(const std::string& old_cluster_state); + + std::string get_node_list(const std::vector<uint16_t>& nodes, size_t count); + std::string get_node_list(const std::vector<uint16_t>& nodes); + + std::string merge_bucket_lists(const lib::ClusterState& old_state, + const std::string& existing_data, + const lib::ClusterState& new_state, + const std::string& new_data, + bool include_bucket_info = false); + + std::string merge_bucket_lists(const std::string& existingData, + const std::string& newData, + bool includeBucketInfo = false); + + std::vector<uint16_t> get_send_set() const; + + std::vector<uint16_t> get_sent_nodes_with_preemption( + const std::string& old_cluster_state, + uint32_t expected_old_state_messages, + const std::string& preempted_cluster_state, + const std::string& new_cluster_state); + + std::vector<uint16_t> expand_node_vec(const std::vector<uint16_t>& nodes); + + void trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs); + + const DistributorBucketSpaceRepo& mutable_repo(DistributorStripe& s) const noexcept { + return s.getBucketSpaceRepo(); + } + // Note: not calling this "immutable_repo" since it may actually be modified by the pending + // cluster state component (just not by operations), so it would not have the expected semantics. + const DistributorBucketSpaceRepo& read_only_repo(DistributorStripe& s) const noexcept { + return s.getReadOnlyBucketSpaceRepo(); + } + + const BucketDatabase& mutable_default_db(DistributorStripe& s) const noexcept { + return mutable_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + const BucketDatabase& mutable_global_db(DistributorStripe& s) const noexcept { + return mutable_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + const BucketDatabase& read_only_default_db(DistributorStripe& s) const noexcept { + return read_only_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + const BucketDatabase& read_only_global_db(DistributorStripe& s) const noexcept { + return read_only_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + + void set_stale_reads_enabled(bool enabled) { + for (auto* s : distributor_stripes()) { + s->bucket_db_updater().set_stale_reads_enabled(enabled); + } + bucket_db_updater().set_stale_reads_enabled(enabled); + } + + size_t mutable_default_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += mutable_default_db(*s).size(); + } + return total; + } + + size_t mutable_global_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += mutable_global_db(*s).size(); + } + return total; + } + + size_t read_only_default_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += read_only_default_db(*s).size(); + } + return total; + } + + size_t read_only_global_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += read_only_global_db(*s).size(); + } + return total; + } + }; TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest() @@ -347,6 +545,21 @@ std::string dist_config_6_nodes_across_4_groups() { "group[3].nodes[1].index 5\n"); } +std::string dist_config_3_nodes_in_1_group() { + return ("redundancy 2\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[3]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[1].nodes[2].index 2\n"); +} + std::string make_string_list(std::string s, uint32_t count) { @@ -368,6 +581,54 @@ make_request_bucket_info_strings(uint32_t count) } + +std::string +TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes, size_t count) +{ + std::ostringstream ost; + bool first = true; + for (const auto node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + if (!first) { + ost << ","; + } + ost << node; + first = false; + } + } + return ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes) +{ + return get_node_list(nodes, _bucket_spaces.size()); +} + +void +TopLevelBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs) +{ + lib::ClusterState initial_state(initial_state_str); + set_cluster_state(initial_state); + ASSERT_EQ(message_count(initial_expected_msgs), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( + initial_state, message_count(initial_expected_msgs), initial_buckets)); + _sender.clear(); + + lib::ClusterState pending_state(pending_state_str); // Ownership change + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); + ASSERT_EQ(message_count(pending_expected_msgs), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( + pending_state, message_count(pending_expected_msgs), pending_buckets)); + _sender.clear(); +} + TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); @@ -525,13 +786,13 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { } for (int i=0; i<10; i++) { - EXPECT_EQ(std::string(""), - verifyBucket(document::BucketId(16, i), - lib::ClusterState("distributor:1 storage:1"))); + EXPECT_EQ("", + verify_bucket(document::BucketId(16, i), + lib::ClusterState("distributor:1 storage:1"))); } // Set system state should now be passed on - EXPECT_EQ(std::string("Set system state"), _sender_down.getCommands()); + EXPECT_EQ("Set system state", _sender_down.getCommands()); } TEST_F(TopLevelBucketDBUpdaterTest, down_while_init) { @@ -588,9 +849,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_down_copies_get_in_sync) { set_cluster_state("distributor:1 storage:3 .1.s:d"); - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " + "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)", dump_bucket(bid)); } @@ -651,11 +912,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) { } } - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(bucketlist[0])); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(bucketlist[1])); { @@ -688,17 +949,17 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) { } } - EXPECT_EQ(std::string("BucketId(0x4000000000000000) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000000) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 0))); - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 2))); - EXPECT_EQ(std::string("BucketId(0x4000000000000004) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000004) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 4))); _sender.clear(); @@ -822,9 +1083,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { } // No database update until request bucket info replies have been received. - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," - "trusted=false,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," + "trusted=false,active=false,ready=false)", dump_bucket(document::BucketId(16, 1))); EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2))); @@ -845,11 +1106,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply); } - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)", dump_bucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 2))); } @@ -947,4 +1208,1458 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_buck } } +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) { + enable_distributor_cluster_state("distributor:1 storage:3"); + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + nodes.push_back(api::MergeBucketCommand::Node(0)); + nodes.push_back(api::MergeBucketCommand::Node(1)); + nodes.push_back(api::MergeBucketCommand::Node(2)); + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + + for (uint32_t i = 0; i < 3; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " + "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) { + enable_distributor_cluster_state("distributor:1 storage:3"); + std::vector<api::MergeBucketCommand::Node> nodes; + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + set_cluster_state(lib::ClusterState("distributor:1 storage:2")); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(2), _sender.commands().size()); + + for (uint32_t i = 0; i < 2; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { + enable_distributor_cluster_state("distributor:1 storage:3"); + std::vector<api::MergeBucketCommand::Node> nodes; + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + + set_cluster_state(lib::ClusterState("distributor:1 storage:2")); + + for (uint32_t i = 0; i < 3; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, flush) { + enable_distributor_cluster_state("distributor:1 storage:3"); + _sender.clear(); + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + ASSERT_EQ(size_t(0), _sender_down.replies().size()); + + stripe_of_bucket(bucket_id).bucket_db_updater().flush(); + // Flushing should drop all merge bucket replies + EXPECT_EQ(size_t(0), _sender_down.commands().size()); +} + +std::string +TopLevelBucketDBUpdaterTest::get_sent_nodes(const std::string& old_cluster_state, + const std::string& new_cluster_state) +{ + auto fixture = create_pending_state_fixture_for_state_change(old_cluster_state, new_cluster_state); + sort_sent_messages_by_index(fixture->sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < fixture->sender.commands().size(); i++) { + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.command(i)); + + if (i > 0) { + ost << ","; + } + + ost << req.getAddress()->getIndex(); + } + + return ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::string& old_cluster_state) +{ + DistributorMessageSenderStub sender; + + framework::defaultimplementation::FakeClock clock; + auto cluster_info = create_cluster_info(old_cluster_state); + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForDistributionChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1))); + + sort_sent_messages_by_index(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands().size(); i++) { + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.command(i)); + + if (i > 0) { + ost << ","; + } + + ost << req.getAddress()->getIndex(); + } + + return ost.str(); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_send_messages) { + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("cluster:d", + "distributor:1 storage:3")); + + EXPECT_EQ(get_node_list({0, 1}), + get_sent_nodes("cluster:d", + "distributor:1 storage:3 .2.s:m")); + + EXPECT_EQ(get_node_list({2}), + get_sent_nodes("distributor:1 storage:2", + "distributor:1 storage:3")); + + EXPECT_EQ(get_node_list({2, 3, 4, 5}), + get_sent_nodes("distributor:1 storage:2", + "distributor:1 storage:6")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:4 storage:3", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2, 3}), + get_sent_nodes("distributor:4 storage:3", + "distributor:4 .2.s:d storage:4")); + + EXPECT_EQ("", + get_sent_nodes("distributor:4 storage:3", + "distributor:4 .0.s:d storage:4")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:4 storage:3")); + + EXPECT_EQ(get_node_list({2}), + get_sent_nodes("distributor:3 storage:3 .2.s:i", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({1}), + get_sent_nodes("distributor:3 storage:3 .1.s:d", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({1, 2, 4}), + get_sent_nodes("distributor:3 storage:4 .1.s:d .2.s:i", + "distributor:3 storage:5")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "cluster:d")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "distributor:1 storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "cluster:d distributor:1 storage:6")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:3 .2.s:m storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:3 .2.s:m storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 .2.s:m storage:3", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes_distribution_changed("distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1}), + get_sent_nodes("distributor:10 storage:2", + "distributor:10 .1.s:d storage:2")); + + EXPECT_EQ("", + get_sent_nodes("distributor:2 storage:2", + "distributor:3 .2.s:i storage:2")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:3 storage:3", + "distributor:3 .2.s:s storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 .2.s:s storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ(get_node_list({1}), + get_sent_nodes("distributor:3 storage:3 .1.s:m", + "distributor:3 storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:3 storage:3 .1.s:m")); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) { + DistributorMessageSenderStub sender; + + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("distributor:1 storage:3")); + + framework::defaultimplementation::FakeClock clock; + auto cluster_info = create_cluster_info("cluster:d"); + OutdatedNodesMap outdated_nodes_map; + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, api::Timestamp(1))); + + ASSERT_EQ(message_count(3), sender.commands().size()); + + sort_sent_messages_by_index(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands().size(); i++) { + auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.command(i).get()); + ASSERT_TRUE(req != nullptr); + + auto rep = std::make_shared<RequestBucketInfoReply>(*req); + + rep->getBucketInfo().push_back( + RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(i, i, i, i, i))); + + ASSERT_TRUE(state->onRequestBucketInfoReply(rep)); + ASSERT_EQ((i == (sender.commands().size() - 1)), state->done()); + } + + auto& pending_transition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); + EXPECT_EQ(3u, pending_transition.results().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) { + std::string config = dist_config_6_nodes_across_4_groups(); + config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; + set_distribution(config); + + // Group config has nodes {0, 1}, {2, 3}, {4, 5} + // We're node index 0. + + // Entire group 1 goes down. Must refetch from all nodes. + EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}), + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); + + // But don't fetch if not the entire group is down. + EXPECT_EQ("", + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) { + std::string config = dist_config_6_nodes_across_4_groups(); + config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; + set_distribution(config); + + // Group is down, but config says to not do anything about it. + EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1), + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); +} + + +namespace { + +void +parse_input_data(const std::string& data, + uint64_t timestamp, + PendingClusterState& state, + bool include_bucket_info) +{ + vespalib::StringTokenizer tokenizer(data, "|"); + for (uint32_t i = 0; i < tokenizer.size(); i++) { + vespalib::StringTokenizer tok2(tokenizer[i], ":"); + + uint16_t node = atoi(tok2[0].data()); + + state.setNodeReplied(node); + auto& pending_transition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); + + vespalib::StringTokenizer tok3(tok2[1], ","); + for (uint32_t j = 0; j < tok3.size(); j++) { + if (include_bucket_info) { + vespalib::StringTokenizer tok4(tok3[j], "/"); + + pending_transition.addNodeInfo( + document::BucketId(16, atoi(tok4[0].data())), + BucketCopy( + timestamp, + node, + api::BucketInfo( + atoi(tok4[1].data()), + atoi(tok4[2].data()), + atoi(tok4[3].data()), + atoi(tok4[2].data()), + atoi(tok4[3].data())))); + } else { + pending_transition.addNodeInfo( + document::BucketId(16, atoi(tok3[j].data())), + BucketCopy(timestamp, + node, + api::BucketInfo(3, 3, 3, 3, 3))); + } + } + } +} + +struct BucketDumper : public BucketDatabase::EntryProcessor +{ + std::ostringstream ost; + bool _include_bucket_info; + + explicit BucketDumper(bool include_bucket_info) + : _include_bucket_info(include_bucket_info) + { + } + + bool process(const BucketDatabase::ConstEntryRef& e) override { + document::BucketId bucket_id(e.getBucketId()); + + ost << uint32_t(bucket_id.getRawId()) << ":"; + for (uint32_t i = 0; i < e->getNodeCount(); ++i) { + if (i > 0) { + ost << ","; + } + const BucketCopy& copy(e->getNodeRef(i)); + ost << copy.getNode(); + if (_include_bucket_info) { + ost << "/" << copy.getChecksum() + << "/" << copy.getDocumentCount() + << "/" << copy.getTotalDocumentSize() + << "/" << (copy.trusted() ? "t" : "u"); + } + } + ost << "|"; + return true; + } +}; + +} + +std::string +TopLevelBucketDBUpdaterTest::merge_bucket_lists( + const lib::ClusterState& old_state, + const std::string& existing_data, + const lib::ClusterState& new_state, + const std::string& new_data, + bool include_bucket_info) +{ + framework::defaultimplementation::FakeClock clock; + framework::MilliSecTimer timer(clock); + + DistributorMessageSenderStub sender; + OutdatedNodesMap outdated_nodes_map; + + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(old_state); + api::Timestamp before_time(1); + auto cluster_info = create_cluster_info("cluster:d"); + + auto state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, before_time); + + parse_input_data(existing_data, before_time, *state, include_bucket_info); + auto guard = acquire_stripe_guard(); + state->merge_into_bucket_databases(*guard); + } + + BucketDumper dumper_tmp(true); + for (auto* s : distributor_stripes()) { + auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.forEach(dumper_tmp); + } + + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_state)); + api::Timestamp after_time(2); + auto cluster_info = create_cluster_info(old_state.toString()); + + auto state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, after_time); + + parse_input_data(new_data, after_time, *state, include_bucket_info); + auto guard = acquire_stripe_guard(); + state->merge_into_bucket_databases(*guard); + } + + BucketDumper dumper(include_bucket_info); + for (auto* s : distributor_stripes()) { + auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.forEach(dumper); + db.clear(); + } + return dumper.ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::merge_bucket_lists(const std::string& existing_data, + const std::string& new_data, + bool include_bucket_info) +{ + return merge_bucket_lists( + lib::ClusterState("distributor:1 storage:3"), + existing_data, + lib::ClusterState("distributor:1 storage:3"), + new_data, + include_bucket_info); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge) { + // Result is on the form: [bucket w/o count bits]:[node indexes]|.. + // Input is on the form: [node]:[bucket w/o count bits]|... + + // Simple initializing case - ask all nodes for info + EXPECT_EQ("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|", + merge_bucket_lists( + "", + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6")); + + // New node came up + EXPECT_EQ("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|", + merge_bucket_lists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "3:1,3,5,6")); + + // Node came up with some buckets removed and some added + // Buckets that were removed should not be removed as the node + // didn't lose a disk. + EXPECT_EQ("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|", + merge_bucket_lists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "0:1,2,6,8")); + + // Bucket info format is "bucketid/checksum/count/size" + // Node went from initializing to up and invalid bucket went to empty. + EXPECT_EQ("2:0/0/0/0/t|", + merge_bucket_lists( + "0:2/0/0/1", + "0:2/0/0/0", + true)); + + EXPECT_EQ("5:1/2/3/4/u,0/0/0/0/u|", + merge_bucket_lists("", "0:5/0/0/0|1:5/2/3/4", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) { + // Node went from initializing to up and non-invalid bucket changed. + EXPECT_EQ("2:0/2/3/4/t|3:0/2/4/6/t|", + merge_bucket_lists( + lib::ClusterState("distributor:1 storage:1 .0.s:i"), + "0:2/1/2/3,3/2/4/6", + lib::ClusterState("distributor:1 storage:1"), + "0:2/2/3/4,3/2/4/6", + true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) { + document::BucketId bucket(16, 3); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + + stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket)); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0)); + + lib::ClusterState state_after("distributor:3 storage:3"); + + { + uint32_t expected_msgs = message_count(2), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_after, expected_msgs, dummy_buckets_to_return)); + } + EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state()); + + ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi)); + + EXPECT_EQ("NONEXISTING", dump_bucket(bucket)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { + document::BucketId bucket(16, 3); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + + stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket)); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0)); + + lib::ClusterState state_after("distributor:3 storage:3"); + // Set, but _don't_ enable cluster state. We want it to be pending. + set_cluster_state(state_after); + EXPECT_TRUE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state()); + EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_pending_state()); + + ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi)); + + EXPECT_EQ("NONEXISTING", dump_bucket(bucket)); +} + +/* + * If we get a distribution config change, it's important that cluster states that + * arrive after this--but _before_ the pending cluster state has finished--must trigger + * a full bucket info fetch no matter what the cluster state change was! Otherwise, we + * will with a high likelihood end up not getting the complete view of the buckets in + * the cluster. + */ +TEST_F(TopLevelBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribution_change_pending) { + lib::ClusterState state_before("distributor:6 storage:6"); + { + uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + std::string distConfig(dist_config_6_nodes_across_2_groups()); + set_distribution(distConfig); + + sort_sent_messages_by_index(_sender); + ASSERT_EQ(message_count(6), _sender.commands().size()); + // Suddenly, a wild cluster state change appears! Even though this state + // does not in itself imply any bucket changes, it will still overwrite the + // pending cluster state and thus its state of pending bucket info requests. + set_cluster_state("distributor:6 .2.t:12345 storage:6"); + + ASSERT_EQ(message_count(12), _sender.commands().size()); + + // Send replies for first messageCount(6) (outdated requests). + int num_buckets = 10; + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"), + *_sender.command(i), num_buckets)); + } + // No change from these. + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(1, "distributor:6 storage:6")); + + // Send for current pending. + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + *_sender.command(i + message_count(6)), + num_buckets)); + } + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:6 storage:6")); + _sender.clear(); + + // No more pending global fetch; this should be a no-op state. + set_cluster_state("distributor:6 .3.t:12345 storage:6"); + EXPECT_EQ(size_t(0), _sender.commands().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { + uint32_t num_buckets = 20; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), + message_count(6), num_buckets)); + _sender.clear(); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + complete_recovery_mode_on_all_stripes(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + set_distribution(dist_config_6_nodes_across_4_groups()); + sort_sent_messages_by_index(_sender); + // No replies received yet, still no recovery mode. + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + ASSERT_EQ(message_count(6), _sender.commands().size()); + num_buckets = 10; + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"), + *_sender.command(i), num_buckets)); + } + + // Pending cluster state (i.e. distribution) has been enabled, which should + // cause recovery mode to be entered. + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + complete_recovery_mode_on_all_stripes(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) { + set_distribution(dist_config_3_nodes_in_1_group()); + + constexpr uint32_t n_buckets = 100; + ASSERT_NO_FATAL_FAILURE( + set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), message_count(6), n_buckets)); + _sender.clear(); + + // Config implies a different node set than the current cluster state, so it's crucial that + // DB pruning is _not_ elided. Yes, this is inherently racing with cluster state changes and + // should be changed to be atomic and controlled by the cluster controller instead of config. + // But this is where we currently are. + set_distribution(dist_config_6_nodes_across_2_groups()); + for (auto* s : distributor_stripes()) { + const auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.acquire_read_guard()->for_each([&]([[maybe_unused]] uint64_t key, const auto& e) { + auto id = e.getBucketId(); + EXPECT_TRUE(distributor_bucket_space(id).get_bucket_ownership_flags(id).owned_in_pending_state()); + }); + } +} + +TEST_F(TopLevelBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) { + fake_clock().setAbsoluteTimeInSeconds(101234); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + // setAndEnableClusterState adds n buckets with id (16, i) + document::BucketId bucket(16, 0); + BucketDatabase::Entry e = get_bucket(bucket); + ASSERT_TRUE(e.valid()); + EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) { + { + lib::ClusterState state_before("distributor:1 storage:1 .0.s:i"); + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 0; + // This step is required to make the distributor ready for accepting + // the below explicit database insertion towards node 0. + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + fake_clock().setAbsoluteTimeInSeconds(1000); + lib::ClusterState state("distributor:1 storage:1"); + set_cluster_state(state); + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + // Before replying with the bucket info, simulate the arrival of a mutation + // reply that alters the state of the bucket with information that will be + // more recent that what is returned by the bucket info. This information + // must not be lost when the bucket info is later merged into the database. + document::BucketId bucket(16, 1); + constexpr uint64_t insertion_timestamp = 1001ULL * 1000000; + api::BucketInfo wanted_info(5, 6, 7); + stripe_of_bucket(bucket).bucket_db_updater().operation_context().update_bucket_database( + makeDocumentBucket(bucket), + BucketCopy(insertion_timestamp, 0, wanted_info), + DatabaseUpdate::CREATE_IF_NONEXISTING); + + fake_clock().setAbsoluteTimeInSeconds(1002); + constexpr uint32_t buckets_returned = 10; // Buckets (16, 0) ... (16, 9) + // Return bucket information which on the timeline might originate from + // anywhere between [1000, 1002]. Our assumption is that any mutations + // taking place after t=1000 must have its reply received and processed + // by this distributor and timestamped strictly higher than t=1000 (modulo + // clock skew, of course, but that is outside the scope of this). A mutation + // happening before t=1000 but receiving a reply at t>1000 does not affect + // correctness, as this should contain the same bucket info as that + // contained in the full bucket reply and the DB update is thus idempotent. + for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), buckets_returned)); + } + + BucketDatabase::Entry e = get_bucket(bucket); + ASSERT_EQ(uint32_t(1), e->getNodeCount()); + EXPECT_EQ(wanted_info, e->getNodeRef(0).getBucketInfo()); +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::get_send_set() const +{ + std::vector<uint16_t> nodes; + std::transform(_sender.commands().begin(), + _sender.commands().end(), + std::back_inserter(nodes), + [](auto& cmd) + { + auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd)); + return req.getAddress()->getIndex(); + }); + return nodes; +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::get_sent_nodes_with_preemption( + const std::string& old_cluster_state, + uint32_t expected_old_state_messages, + const std::string& preempted_cluster_state, + const std::string& new_cluster_state) +{ + uint32_t dummy_buckets_to_return = 10; + // FIXME cannot chain assertion checks in non-void function + set_and_enable_cluster_state(lib::ClusterState(old_cluster_state), + expected_old_state_messages, + dummy_buckets_to_return); + + _sender.clear(); + + set_cluster_state(preempted_cluster_state); + _sender.clear(); + // Do not allow the pending state to become the active state; trigger a + // new transition without ACKing the info requests first. This will + // overwrite the pending state entirely. + set_cluster_state(lib::ClusterState(new_cluster_state)); + return get_send_set(); +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::expand_node_vec(const std::vector<uint16_t>& nodes) +{ + std::vector<uint16_t> res; + size_t count = _bucket_spaces.size(); + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + res.push_back(node); + } + } + return res; +} + +/* + * If we don't carry over the set of nodes that we need to fetch from, + * a naive comparison between the active state and the new state will + * make it appear to the distributor that nothing has changed, as any + * database modifications caused by intermediate states will not be + * accounted for (basically the ABA problem in a distributed setting). + */ +TEST_F(TopLevelBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_over_to_next_state_fetch) { + EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}), + get_sent_nodes_with_preemption("version:1 distributor:6 storage:6", + message_count(6), + "version:2 distributor:6 .5.s:d storage:6", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) { + EXPECT_EQ(expand_node_vec({2, 3}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6 .2.s:d", + message_count(5), + "version:2 distributor:6 storage:6 .2.s:d .3.s:d", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) { + EXPECT_EQ(expand_node_vec({2}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6", + message_count(6), + "version:2 distributor:6 storage:6 .2.s:d", + "version:3 distributor:6 storage:6")); +} + +using NodeVec = std::vector<uint16_t>; + +TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) { + EXPECT_EQ(NodeVec{}, + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6 .2.s:d", + message_count(5), + "version:2 distributor:6 storage:6", // Sends to 2. + "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. +} + +TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_not_part_of_new_state) { + // Even though 100 nodes are preempted, not all of these should be part + // of the request afterwards when only 6 are part of the state. + EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:100", + message_count(100), + "version:2 distributor:5 .4.s:d storage:100", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) { + lib::ClusterState state_before("version:1 distributor:6 storage:6 .1.t:1234"); + uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 10; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + // New cluster state that should not by itself trigger any new fetches, + // unless outdated node set is somehow not cleared after an enabled + // (completed) cluster state has been set. + set_cluster_state("version:3 distributor:6 storage:6"); + EXPECT_EQ(size_t(0), _sender.commands().size()); +} + +// XXX test currently disabled since distribution config currently isn't used +// at all in order to deduce the set of nodes to send to. This might not matter +// in practice since it is assumed that the cluster state matching the new +// distribution config will follow very shortly after the config has been +// applied to the node. The new cluster state will then send out requests to +// the correct node set. +TEST_F(TopLevelBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to_available_nodes) { + uint32_t expected_msgs = 6, dummy_buckets_to_return = 20; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), + expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + + // Intentionally trigger a racing config change which arrives before the + // new cluster state representing it. + set_distribution(dist_config_3_nodes_in_1_group()); + sort_sent_messages_by_index(_sender); + + EXPECT_EQ((NodeVec{0, 1, 2}), get_send_set()); +} + +/** + * Test scenario where a cluster is downsized by removing a subset of the nodes + * from the distribution configuration. The system must be able to deal with + * a scenario where the set of nodes between two cluster states across a config + * change may differ. + * + * See VESPA-790 for details. + */ +TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing_ownership_transfer) { + uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:3 storage:3"), + expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + + // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config + // that does not contain node 2 while the _active_ cluster state still + // contains this node. + const char* downsize_cfg = + "redundancy 2\n" + "distributor_auto_ownership_transfer_on_whole_group_down true\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[2]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n"; + + set_distribution(downsize_cfg); + sort_sent_messages_by_index(_sender); + _sender.clear(); + + // Attempt to apply state with {0, 1} set. This will compare the new state + // with the previous state, which still has node 2. + expected_msgs = message_count(2); + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:2 storage:2"), + expected_msgs, dummy_buckets_to_return)); + + EXPECT_EQ(expand_node_vec({0, 1}), get_send_set()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:1 storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); + + fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 .1.s:d storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 storage:1"); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); + + fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d"); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_distribution_change("distributor:2 storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2))); + + EXPECT_EQ(uint64_t(5000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2))); + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:3", 3, message_count(1))); + + EXPECT_EQ(uint64_t(3000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { + lib::ClusterState state("distributor:2 storage:2"); + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, message_count(2), 1)); + + _sender.clear(); + set_distribution(dist_config_3_nodes_in_1_group()); + fake_clock().addSecondsToTime(4); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, message_count(2))); + EXPECT_EQ(uint64_t(4000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { + _sender.clear(); + set_cluster_state("version:1 distributor:2 storage:2"); + fake_clock().addSecondsToTime(5); + // Pre-empted with new state here, which will push out the old pending + // state and replace it with a new one. We should still count the time + // used processing the old state. + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("version:2 distributor:2 storage:3", 3, message_count(3))); + + EXPECT_EQ(uint64_t(8000), last_transition_time_in_millis()); +} + +/* + * Brief reminder on test DSL for checking bucket merge operations: + * + * merge_bucket_lists() takes as input strings of the format + * <node>:<raw bucket id>/<checksum>/<num docs>/<doc size>|<node>: + * and returns a string describing the bucket DB post-merge with the format + * <raw bucket id>:<node>/<checksum>/<num docs>/<doc size>,<node>:....|<raw bucket id>:.... + * + * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake. + */ + +TEST_F(TopLevelBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) { + // Replacing bucket information for content node 0 should not mark existing + // untrusted replica as trusted as a side effect. + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists( + lib::ClusterState("distributor:1 storage:3 .0.s:i"), + "0:5/0/0/0|1:5/7/8/9", + lib::ClusterState("distributor:1 storage:3 .0.s:u"), + "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists("", "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { + EXPECT_EQ("5:0/1/2/3/t|", + merge_bucket_lists("", "0:5/1/2/3", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { + EXPECT_EQ("5:0/1/2/3/t|", + merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { + EXPECT_EQ("5:1/2/3/4/u,0/1/2/3/t|", + merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { + // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted + // in that _all_ content nodes are considered outdated when distributor changes take place, + // and therefore a slightly different code path is taken. In particular, bucket info for + // outdated nodes gets removed before possibly being re-added (if present in the bucket info + // response). + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists( + lib::ClusterState("distributor:2 storage:3"), + "0:5/1/2/3|1:5/7/8/9", + lib::ClusterState("distributor:1 storage:3"), + "0:5/1/2/3|1:5/7/8/9", true)); +} + +// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 +TEST_F(TopLevelBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { + set_distribution(dist_config_6_nodes_across_2_groups()); + + const vespalib::string current_hash = "(0d*|*(0;0;1;2)(1;3;4;5))"; + const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))"; + + set_cluster_state("distributor:6 storage:6"); + ASSERT_EQ(message_count(6), _sender.commands().size()); + + api::RequestBucketInfoCommand* global_req = nullptr; + for (auto& cmd : _sender.commands()) { + auto& req_cmd = dynamic_cast<api::RequestBucketInfoCommand&>(*cmd); + if (req_cmd.getBucketSpace() == document::FixedBucketSpaces::global_space()) { + global_req = &req_cmd; + break; + } + } + ASSERT_TRUE(global_req != nullptr); + ASSERT_EQ(current_hash, global_req->getDistributionHash()); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req); + reply->setResult(api::ReturnCode::REJECTED); + bucket_db_updater().onRequestBucketInfoReply(reply); + + fake_clock().addSecondsToTime(10); + bucket_db_updater().resend_delayed_messages(); + + // Should now be a resent request with legacy distribution hash + ASSERT_EQ(message_count(6) + 1, _sender.commands().size()); + auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back()); + ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash()); + + // Now if we reject it _again_ we should cycle back to the current hash + // in case it wasn't a hash-based rejection after all. And the circle of life continues. + reply = std::make_shared<api::RequestBucketInfoReply>(legacy_req); + reply->setResult(api::ReturnCode::REJECTED); + bucket_db_updater().onRequestBucketInfoReply(reply); + + fake_clock().addSecondsToTime(10); + bucket_db_updater().resend_delayed_messages(); + + ASSERT_EQ(message_count(6) + 2, _sender.commands().size()); + auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back()); + ASSERT_EQ(current_hash, new_current_req.getDistributionHash()); +} + +namespace { + +template <typename Func> +void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) { + BucketId last(0); + auto e = db.getNext(last); + while (e.valid()) { + f(space, e); + e = db.getNext(e.getBucketId()); + } +} + +template <typename Func> +void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { + for (const auto& space : repo) { + for_each_bucket(space.second->getBucketDatabase(), space.first, f); + } +} + +} + +TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { + set_stale_reads_enabled(true); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + ASSERT_EQ(message_count(4), _sender.commands().size()); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets)); + _sender.clear(); + + EXPECT_EQ(n_buckets, mutable_default_dbs_size()); + EXPECT_EQ(n_buckets, mutable_global_dbs_size()); + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); + + lib::ClusterState pending_state("distributor:2 storage:4"); + + std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state; + for (auto* s : distributor_stripes()) { + for_each_bucket(mutable_repo(*s), [&](const auto& space, const auto& entry) { + if (!distributor_bucket_space(entry.getBucketId()).owns_bucket_in_state(pending_state, entry.getBucketId())) { + buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); + } + }); + } + EXPECT_FALSE(buckets_not_owned_in_pending_state.empty()); + + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation + + const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces + const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; + EXPECT_EQ(expected_mutable_buckets, mutable_default_dbs_size()); + EXPECT_EQ(expected_mutable_buckets, mutable_global_dbs_size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_default_dbs_size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_global_dbs_size()); + + for (auto* s : distributor_stripes()) { + for_each_bucket(read_only_repo(*s), [&](const auto& space, const auto& entry) { + EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) + != buckets_not_owned_in_pending_state.end()); + }); + } +} + +TEST_F(TopLevelBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { + constexpr uint32_t n_buckets = 10; + // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will + // cause some buckets to be entirely unavailable. + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); + + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { + set_stale_reads_enabled(false); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + ASSERT_EQ(message_count(4), _sender.commands().size()); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets)); + _sender.clear(); + + // Nothing in read-only DB after first bulk load of buckets. + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); + + set_cluster_state("distributor:2 storage:4"); + // No buckets should be moved into read only db after ownership changes. + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + + // Version should not be switched over yet + EXPECT_EQ(1u, current_cluster_state_bundle().getVersion()); + + EXPECT_EQ(0u, mutable_default_dbs_size()); + EXPECT_EQ(0u, mutable_global_dbs_size()); + + EXPECT_FALSE(activate_cluster_state_version(2)); + + EXPECT_EQ(2u, current_cluster_state_bundle().getVersion()); + EXPECT_EQ(n_buckets, mutable_default_dbs_size()); + EXPECT_EQ(n_buckets, mutable_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0)); + + // State not yet activated, so read-only DBs have got all the buckets we used to have. + EXPECT_EQ(0u, mutable_default_dbs_size()); + EXPECT_EQ(0u, mutable_global_dbs_size()); + EXPECT_EQ(n_buckets, read_only_default_dbs_size()); + EXPECT_EQ(n_buckets, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, + "version:5 distributor:2 storage:4", n_buckets, 0)); + + EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); + + EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + // Activate version 2; no pending cluster state after this. + EXPECT_FALSE(activate_cluster_state_version(2)); + + // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. + // Note: state manager is not modelled in this test, so we just check that the message handler returns + // false (meaning "didn't take message ownership") and there's no auto-generated reply. + EXPECT_FALSE(activate_cluster_state_version(3)); + EXPECT_EQ(size_t(0), _sender.replies().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { + auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d"); + auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m"); + + lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}); + set_cluster_state_bundle(initial_bundle); + + for (auto* s : distributor_stripes()) { + auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); + ASSERT_TRUE(state != nullptr); + EXPECT_EQ(*initial_default, *state); + + state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); + ASSERT_TRUE(state != nullptr); + EXPECT_EQ(*initial_baseline, *state); + } + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0)); + + for (auto* s : distributor_stripes()) { + auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); + EXPECT_TRUE(state == nullptr); + + state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); + EXPECT_TRUE(state == nullptr); + } +} + +struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest { + lib::ClusterState empty_state; + std::shared_ptr<lib::ClusterState> initial_baseline; + std::shared_ptr<lib::ClusterState> initial_default; + lib::ClusterStateBundle initial_bundle; + Bucket default_bucket; + Bucket global_bucket; + + BucketDBUpdaterSnapshotTest() + : TopLevelBucketDBUpdaterTest(), + empty_state(), + initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), + initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")), + initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}), + default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), + global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) + { + } + ~BucketDBUpdaterSnapshotTest() override; + + void SetUp() override { + TopLevelBucketDBUpdaterTest::SetUp(); + set_stale_reads_enabled(true); + }; + + // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space + uint32_t buckets_in_snapshot_matching_current_db(bool check_mutable_repo, BucketSpace bucket_space) { + uint32_t found_buckets = 0; + for (auto* s : distributor_stripes()) { + auto rs = s->bucket_db_updater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!rs.is_routable()) { + return 0; + } + auto guard = rs.steal_read_guard(); + auto& repo = check_mutable_repo ? mutable_repo(*s) : read_only_repo(*s); + for_each_bucket(repo, [&](const auto& space, const auto& entry) { + if (space == bucket_space) { + auto entries = guard->find_parents_and_self(entry.getBucketId()); + if (entries.size() == 1) { + ++found_buckets; + } + } + }); + } + return found_buckets; + } +}; + +BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; + +TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { + set_cluster_state_bundle(initial_bundle); + // State currently pending, empty initial state is active + + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(def_rs.context().has_pending_state_transition()); + EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); + + auto global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(global_rs.context().has_pending_state_transition()); + EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0)); + // State now activated, no pending + + def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(def_rs.context().has_pending_state_transition()); + + global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(global_rs.context().has_pending_state_transition()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + EXPECT_FALSE(activate_cluster_state_version(2)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::default_space()), n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::global_space()), n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + // We're down in state 2 and therefore do not own any buckets + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::default_space()), n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::global_space()), n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { + set_stale_reads_enabled(false); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index d8df36e53b2..8fae1c6d738 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -70,18 +70,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return posted_msgs.str(); } - void tick_distributor_and_stripes_n_times(uint32_t n) { - for (uint32_t i = 0; i < n; ++i) { - tick(false); - } - } - - void tick_top_level_distributor_n_times(uint32_t n) { - for (uint32_t i = 0; i < n; ++i) { - tick(true); - } - } - StatusReporterDelegate& distributor_status_delegate() { return _distributor->_distributorStatusDelegate; } @@ -98,10 +86,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return _distributor->_status_to_do; } - TopLevelDistributor::MetricUpdateHook distributor_metric_update_hook() { - return _distributor->_metricUpdateHook; - } - BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() { return _distributor->getBucketSpacesStats(); } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index c4173f5e8ff..b6e9beb38ae 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -265,6 +265,25 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId); } +DistributorBucketSpaceRepo& +TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept +{ + return _distributor->_component.bucket_space_repo(); +} + +const DistributorBucketSpaceRepo& +TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept +{ + return _distributor->_component.bucket_space_repo(); +} + +std::unique_ptr<StripeAccessGuard> +TopLevelDistributorTestUtil::acquire_stripe_guard() +{ + // Note: this won't actually interact with any threads, as the pool is running in single-threaded test mode. + return _distributor->_stripe_accessor->rendezvous_and_hold_all(); +} + TopLevelBucketDBUpdater& TopLevelDistributorTestUtil::bucket_db_updater() { return *_distributor->_bucket_db_updater; @@ -356,6 +375,11 @@ TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg) tick(); // Config is propagated upon next top-level tick } +framework::MetricUpdateHook& +TopLevelDistributorTestUtil::distributor_metric_update_hook() { + return _distributor->_metricUpdateHook; +} + BucketDatabase& TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) { assert(stripe_idx < _distributor->_stripes.size()); @@ -430,4 +454,41 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di _distributor->enableNextDistribution(); } +const lib::ClusterStateBundle& +TopLevelDistributorTestUtil::current_cluster_state_bundle() const +{ + // We assume that all stripes have the same cluster state internally, so just use the first. + assert(_distributor->_stripes[0]); + const auto& bundle = _distributor->_stripes[0]->getClusterStateBundle(); + // ... but sanity-check just to make sure... + for (size_t i = 1; i < _num_distributor_stripes; ++i) { + assert(_distributor->_stripes[i]->getClusterStateBundle() == bundle); + } + return bundle; +} + +void +TopLevelDistributorTestUtil::tick_distributor_and_stripes_n_times(uint32_t n) +{ + for (uint32_t i = 0; i < n; ++i) { + tick(false); + } +} + +void +TopLevelDistributorTestUtil::tick_top_level_distributor_n_times(uint32_t n) +{ + for (uint32_t i = 0; i < n; ++i) { + tick(true); + } +} + +void +TopLevelDistributorTestUtil::complete_recovery_mode_on_all_stripes() +{ + for (auto* s : distributor_stripes()) { + s->scanAllBuckets(); + } +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index 9048160b652..8832f8ada6e 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -19,12 +19,14 @@ namespace distributor { class TopLevelDistributor; class DistributorBucketSpace; +class DistributorBucketSpaceRepo; class DistributorMetricSet; class DistributorNodeContext; class DistributorStripe; class DistributorStripeComponent; class DistributorStripeOperationContext; class DistributorStripePool; +class StripeAccessGuard; class IdealStateMetricSet; class Operation; class TopLevelBucketDBUpdater; @@ -58,6 +60,12 @@ public: // As the above, but always inserts into default bucket space void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr); + // TODO STRIPE replace with BucketSpaceStateMap once legacy is gone + DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept; + const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept; + + std::unique_ptr<StripeAccessGuard> acquire_stripe_guard(); + TopLevelBucketDBUpdater& bucket_db_updater(); const IdealStateMetricSet& total_ideal_state_metrics() const; const DistributorMetricSet& total_distributor_metrics() const; @@ -77,12 +85,19 @@ public: return _node->getClock(); } + framework::MetricUpdateHook& distributor_metric_update_hook(); + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space); const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const; [[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const; + void tick_distributor_and_stripes_n_times(uint32_t n); + void tick_top_level_distributor_n_times(uint32_t n); + + void complete_recovery_mode_on_all_stripes(); + void setup_distributor(int redundancy, int node_count, const std::string& systemState, @@ -122,6 +137,8 @@ public: void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr); + const lib::ClusterStateBundle& current_cluster_state_bundle() const; + static std::vector<document::BucketSpace> bucket_spaces(); protected: |