From c24f52e097143e8643f59945e75cc0db8f2962d1 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Mon, 13 Sep 2021 13:46:27 +0000 Subject: Port final batch of BucketDBUpdater tests from legacy to top-level code paths --- .../distributor/legacy_bucket_db_updater_test.cpp | 54 +- .../top_level_bucket_db_updater_test.cpp | 637 ++++++++++++++++++++- .../distributor/top_level_distributor_test.cpp | 4 - .../top_level_distributor_test_util.cpp | 18 + .../distributor/top_level_distributor_test_util.h | 4 + 5 files changed, 698 insertions(+), 19 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp index fc2ad82f3a2..b871bf5841e 100644 --- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp @@ -2216,6 +2216,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing EXPECT_EQ(expandNodeVec({0, 1}), getSendSet()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:1 storage:2"); @@ -2226,6 +2227,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_tran EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:2 storage:1"); @@ -2236,18 +2238,21 @@ TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForDistributionChange( "distributor:2 storage:2"); EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1))); @@ -2255,6 +2260,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_st EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { lib::ClusterState state("distributor:2 storage:2"); ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1)); @@ -2267,6 +2273,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_confi EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { _sender.clear(); lib::ClusterState state("distributor:2 storage:2"); @@ -2280,6 +2287,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_trans EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /* * Brief reminder on test DSL for checking bucket merge operations: * @@ -2303,31 +2311,37 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_do "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { EXPECT_EQ(std::string("5:0/1/2/3/t|"), mergeBucketLists("", "0:5/1/2/3", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { EXPECT_EQ(std::string("5:0/1/2/3/t|"), mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"), mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted // in that _all_ content nodes are considered outdated when distributor changes take place, @@ -2343,6 +2357,7 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_ "0:5/1/2/3|1:5/7/8/9", true)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest // TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 TEST_F(LegacyBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { std::string distConfig(getDistConfig6Nodes2Groups()); @@ -2412,6 +2427,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { getBucketDBUpdater().set_stale_reads_enabled(true); @@ -2453,6 +2469,7 @@ TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_own }); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { constexpr uint32_t n_buckets = 10; // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will @@ -2464,6 +2481,7 @@ TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_r EXPECT_EQ(size_t(0), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { getBucketDBUpdater().set_stale_reads_enabled(false); @@ -2509,6 +2527,7 @@ void LegacyBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transiti _sender.clear(); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2529,6 +2548,7 @@ TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2541,6 +2561,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_act EXPECT_EQ(uint64_t(0), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2555,6 +2576,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_ma EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2569,6 +2591,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatchin ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; @@ -2585,6 +2608,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending EXPECT_EQ(size_t(0), _sender.replies().size()); } +// TODO STRIPE disabled benchmark tests are NOT migrated to new test suite TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { // Need to trigger an initial edge to complete first bucket scan ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), @@ -2703,6 +2727,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { auto initial_baseline = std::make_shared("distributor:1 storage:2 .0.s:d"); auto initial_default = std::make_shared("distributor:1 storage:2 .0.s:m"); @@ -2728,7 +2753,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_ EXPECT_TRUE(state == nullptr); } -struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { +struct LegacyBucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { lib::ClusterState empty_state; std::shared_ptr initial_baseline; std::shared_ptr initial_default; @@ -2736,7 +2761,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { Bucket default_bucket; Bucket global_bucket; - BucketDBUpdaterSnapshotTest() + LegacyBucketDBUpdaterSnapshotTest() : LegacyBucketDBUpdaterTest(), empty_state(), initial_baseline(std::make_shared("distributor:1 storage:2 .0.s:d")), @@ -2747,7 +2772,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) { } - ~BucketDBUpdaterSnapshotTest() override; + ~LegacyBucketDBUpdaterSnapshotTest() override; void SetUp() override { LegacyBucketDBUpdaterTest::SetUp(); @@ -2774,19 +2799,22 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest { } }; -BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; +LegacyBucketDBUpdaterSnapshotTest::~LegacyBucketDBUpdaterSnapshotTest() = default; -TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); EXPECT_FALSE(rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); EXPECT_FALSE(rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { set_cluster_state_bundle(initial_bundle); // State currently pending, empty initial state is active @@ -2816,7 +2844,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st EXPECT_FALSE(global_rs.context().has_pending_state_transition()); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2828,7 +2857,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_re n_buckets); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, "version:2 distributor:2 .0.s:d storage:4", 0, 0)); @@ -2838,7 +2868,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bu EXPECT_FALSE(def_rs.is_routable()); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2849,7 +2880,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_onl n_buckets); } -TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest +TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { getBucketDBUpdater().set_stale_reads_enabled(false); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 1a0ba8352b7..01f7d5a4f0a 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -11,13 +11,13 @@ #include #include #include +#include #include #include #include #include #include #include -#include #include #include @@ -131,7 +131,7 @@ public: stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply); } - std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { + std::string verify_bucket(document::BucketId id, const lib::ClusterState& state) { BucketDatabase::Entry entry = get_bucket(id); if (!entry.valid()) { return vespalib::make_string("%s doesn't exist in DB", id.toString().c_str()); @@ -196,6 +196,12 @@ public: sort_sent_messages_by_index(_sender, size_before_state); } + void set_cluster_state_bundle(const lib::ClusterStateBundle& state) { + const size_t size_before_state = _sender.commands().size(); + bucket_db_updater().onSetSystemState(std::make_shared(state)); + sort_sent_messages_by_index(_sender, size_before_state); + } + void set_cluster_state(const vespalib::string& state_str) { set_cluster_state(lib::ClusterState(state_str)); } @@ -205,6 +211,14 @@ public: std::make_shared(version)); } + void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { + ASSERT_EQ(size_t(1), _sender.replies().size()); + auto* response = dynamic_cast(_sender.replies().back().get()); + ASSERT_TRUE(response != nullptr); + ASSERT_EQ(version, response->actualVersion()); + _sender.clear(); + } + void complete_bucket_info_gathering(const lib::ClusterState& state, size_t expected_msgs, uint32_t bucket_count = 1, @@ -304,6 +318,26 @@ public: ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets)); } + void complete_state_transition_in_seconds(const std::string& stateStr, + uint32_t seconds, + uint32_t expectedMsgs) + { + _sender.clear(); + lib::ClusterState state(stateStr); + set_cluster_state(state); + fake_clock().addSecondsToTime(seconds); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expectedMsgs)); + } + + uint64_t last_transition_time_in_millis() { + { + // Force stripe metrics to be aggregated into total. + std::mutex l; + distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l)); + } + return uint64_t(total_distributor_metrics().stateTransitionTime.getLast()); + } + ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) { lib::ClusterState baseline_cluster_state(clusterStateString); lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state); @@ -385,6 +419,75 @@ public: std::vector expand_node_vec(const std::vector& nodes); + void trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs); + + const DistributorBucketSpaceRepo& mutable_repo(DistributorStripe& s) const noexcept { + return s.getBucketSpaceRepo(); + } + // Note: not calling this "immutable_repo" since it may actually be modified by the pending + // cluster state component (just not by operations), so it would not have the expected semantics. + const DistributorBucketSpaceRepo& read_only_repo(DistributorStripe& s) const noexcept { + return s.getReadOnlyBucketSpaceRepo(); + } + + const BucketDatabase& mutable_default_db(DistributorStripe& s) const noexcept { + return mutable_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + const BucketDatabase& mutable_global_db(DistributorStripe& s) const noexcept { + return mutable_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + const BucketDatabase& read_only_default_db(DistributorStripe& s) const noexcept { + return read_only_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + const BucketDatabase& read_only_global_db(DistributorStripe& s) const noexcept { + return read_only_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + + void set_stale_reads_enabled(bool enabled) { + for (auto* s : distributor_stripes()) { + s->bucket_db_updater().set_stale_reads_enabled(enabled); + } + bucket_db_updater().set_stale_reads_enabled(enabled); + } + + size_t mutable_default_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += mutable_default_db(*s).size(); + } + return total; + } + + size_t mutable_global_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += mutable_global_db(*s).size(); + } + return total; + } + + size_t read_only_default_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += read_only_default_db(*s).size(); + } + return total; + } + + size_t read_only_global_dbs_size() const { + size_t total = 0; + for (auto* s : distributor_stripes()) { + total += read_only_global_db(*s).size(); + } + return total; + } + }; TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest() @@ -502,6 +605,30 @@ TopLevelBucketDBUpdaterTest::get_node_list(const std::vector& nodes) return get_node_list(nodes, _bucket_spaces.size()); } +void +TopLevelBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs) +{ + lib::ClusterState initial_state(initial_state_str); + set_cluster_state(initial_state); + ASSERT_EQ(message_count(initial_expected_msgs), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( + initial_state, message_count(initial_expected_msgs), initial_buckets)); + _sender.clear(); + + lib::ClusterState pending_state(pending_state_str); // Ownership change + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); + ASSERT_EQ(message_count(pending_expected_msgs), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( + pending_state, message_count(pending_expected_msgs), pending_buckets)); + _sender.clear(); +} + TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); @@ -660,8 +787,8 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { for (int i=0; i<10; i++) { EXPECT_EQ("", - verifyBucket(document::BucketId(16, i), - lib::ClusterState("distributor:1 storage:1"))); + verify_bucket(document::BucketId(16, i), + lib::ClusterState("distributor:1 storage:1"))); } // Set system state should now be passed on @@ -2033,4 +2160,506 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needi EXPECT_EQ(expand_node_vec({0, 1}), get_send_set()); } +TEST_F(TopLevelBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:1 storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); + + fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 .1.s:d storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 storage:1"); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); + + fixture = create_pending_state_fixture_for_state_change( + "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d"); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { + auto fixture = create_pending_state_fixture_for_distribution_change("distributor:2 storage:2"); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2))); + + EXPECT_EQ(uint64_t(5000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2))); + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:3", 3, message_count(1))); + + EXPECT_EQ(uint64_t(3000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { + lib::ClusterState state("distributor:2 storage:2"); + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, message_count(2), 1)); + + _sender.clear(); + set_distribution(dist_config_3_nodes_in_1_group()); + fake_clock().addSecondsToTime(4); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, message_count(2))); + EXPECT_EQ(uint64_t(4000), last_transition_time_in_millis()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { + _sender.clear(); + set_cluster_state("version:1 distributor:2 storage:2"); + fake_clock().addSecondsToTime(5); + // Pre-empted with new state here, which will push out the old pending + // state and replace it with a new one. We should still count the time + // used processing the old state. + ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("version:2 distributor:2 storage:3", 3, message_count(3))); + + EXPECT_EQ(uint64_t(8000), last_transition_time_in_millis()); +} + +/* + * Brief reminder on test DSL for checking bucket merge operations: + * + * merge_bucket_lists() takes as input strings of the format + * :///|: + * and returns a string describing the bucket DB post-merge with the format + * :///,:....|:.... + * + * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake. + */ + +TEST_F(TopLevelBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) { + // Replacing bucket information for content node 0 should not mark existing + // untrusted replica as trusted as a side effect. + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists( + lib::ClusterState("distributor:1 storage:3 .0.s:i"), + "0:5/0/0/0|1:5/7/8/9", + lib::ClusterState("distributor:1 storage:3 .0.s:u"), + "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists("", "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { + EXPECT_EQ("5:0/1/2/3/t|", + merge_bucket_lists("", "0:5/1/2/3", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { + EXPECT_EQ("5:0/1/2/3/t|", + merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { + EXPECT_EQ("5:1/2/3/4/u,0/1/2/3/t|", + merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { + // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted + // in that _all_ content nodes are considered outdated when distributor changes take place, + // and therefore a slightly different code path is taken. In particular, bucket info for + // outdated nodes gets removed before possibly being re-added (if present in the bucket info + // response). + EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|", + merge_bucket_lists( + lib::ClusterState("distributor:2 storage:3"), + "0:5/1/2/3|1:5/7/8/9", + lib::ClusterState("distributor:1 storage:3"), + "0:5/1/2/3|1:5/7/8/9", true)); +} + +// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 +TEST_F(TopLevelBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { + set_distribution(dist_config_6_nodes_across_2_groups()); + + const vespalib::string current_hash = "(0d*|*(0;0;1;2)(1;3;4;5))"; + const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))"; + + set_cluster_state("distributor:6 storage:6"); + ASSERT_EQ(message_count(6), _sender.commands().size()); + + api::RequestBucketInfoCommand* global_req = nullptr; + for (auto& cmd : _sender.commands()) { + auto& req_cmd = dynamic_cast(*cmd); + if (req_cmd.getBucketSpace() == document::FixedBucketSpaces::global_space()) { + global_req = &req_cmd; + break; + } + } + ASSERT_TRUE(global_req != nullptr); + ASSERT_EQ(current_hash, global_req->getDistributionHash()); + + auto reply = std::make_shared(*global_req); + reply->setResult(api::ReturnCode::REJECTED); + bucket_db_updater().onRequestBucketInfoReply(reply); + + fake_clock().addSecondsToTime(10); + bucket_db_updater().resend_delayed_messages(); + + // Should now be a resent request with legacy distribution hash + ASSERT_EQ(message_count(6) + 1, _sender.commands().size()); + auto& legacy_req = dynamic_cast(*_sender.commands().back()); + ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash()); + + // Now if we reject it _again_ we should cycle back to the current hash + // in case it wasn't a hash-based rejection after all. And the circle of life continues. + reply = std::make_shared(legacy_req); + reply->setResult(api::ReturnCode::REJECTED); + bucket_db_updater().onRequestBucketInfoReply(reply); + + fake_clock().addSecondsToTime(10); + bucket_db_updater().resend_delayed_messages(); + + ASSERT_EQ(message_count(6) + 2, _sender.commands().size()); + auto& new_current_req = dynamic_cast(*_sender.commands().back()); + ASSERT_EQ(current_hash, new_current_req.getDistributionHash()); +} + +namespace { + +template +void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) { + BucketId last(0); + auto e = db.getNext(last); + while (e.valid()) { + f(space, e); + e = db.getNext(e.getBucketId()); + } +} + +template +void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { + for (const auto& space : repo) { + for_each_bucket(space.second->getBucketDatabase(), space.first, f); + } +} + +} + +TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { + set_stale_reads_enabled(true); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + ASSERT_EQ(message_count(4), _sender.commands().size()); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets)); + _sender.clear(); + + EXPECT_EQ(n_buckets, mutable_default_dbs_size()); + EXPECT_EQ(n_buckets, mutable_global_dbs_size()); + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); + + lib::ClusterState pending_state("distributor:2 storage:4"); + + std::unordered_set buckets_not_owned_in_pending_state; + for (auto* s : distributor_stripes()) { + for_each_bucket(mutable_repo(*s), [&](const auto& space, const auto& entry) { + if (!distributor_bucket_space(entry.getBucketId()).owns_bucket_in_state(pending_state, entry.getBucketId())) { + buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); + } + }); + } + EXPECT_FALSE(buckets_not_owned_in_pending_state.empty()); + + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation + + const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces + const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; + EXPECT_EQ(expected_mutable_buckets, mutable_default_dbs_size()); + EXPECT_EQ(expected_mutable_buckets, mutable_global_dbs_size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_default_dbs_size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_global_dbs_size()); + + for (auto* s : distributor_stripes()) { + for_each_bucket(read_only_repo(*s), [&](const auto& space, const auto& entry) { + EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) + != buckets_not_owned_in_pending_state.end()); + }); + } +} + +TEST_F(TopLevelBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { + constexpr uint32_t n_buckets = 10; + // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will + // cause some buckets to be entirely unavailable. + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); + + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { + set_stale_reads_enabled(false); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + ASSERT_EQ(message_count(4), _sender.commands().size()); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets)); + _sender.clear(); + + // Nothing in read-only DB after first bulk load of buckets. + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); + + set_cluster_state("distributor:2 storage:4"); + // No buckets should be moved into read only db after ownership changes. + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + + // Version should not be switched over yet + EXPECT_EQ(1u, current_cluster_state_bundle().getVersion()); + + EXPECT_EQ(0u, mutable_default_dbs_size()); + EXPECT_EQ(0u, mutable_global_dbs_size()); + + EXPECT_FALSE(activate_cluster_state_version(2)); + + EXPECT_EQ(2u, current_cluster_state_bundle().getVersion()); + EXPECT_EQ(n_buckets, mutable_default_dbs_size()); + EXPECT_EQ(n_buckets, mutable_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + + EXPECT_EQ(0u, read_only_default_dbs_size()); + EXPECT_EQ(0u, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0)); + + // State not yet activated, so read-only DBs have got all the buckets we used to have. + EXPECT_EQ(0u, mutable_default_dbs_size()); + EXPECT_EQ(0u, mutable_global_dbs_size()); + EXPECT_EQ(n_buckets, read_only_default_dbs_size()); + EXPECT_EQ(n_buckets, read_only_global_dbs_size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, + "version:5 distributor:2 storage:4", n_buckets, 0)); + + EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); + + EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { + set_stale_reads_enabled(true); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + // Activate version 2; no pending cluster state after this. + EXPECT_FALSE(activate_cluster_state_version(2)); + + // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. + // Note: state manager is not modelled in this test, so we just check that the message handler returns + // false (meaning "didn't take message ownership") and there's no auto-generated reply. + EXPECT_FALSE(activate_cluster_state_version(3)); + EXPECT_EQ(size_t(0), _sender.replies().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { + auto initial_baseline = std::make_shared("distributor:1 storage:2 .0.s:d"); + auto initial_default = std::make_shared("distributor:1 storage:2 .0.s:m"); + + lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}); + set_cluster_state_bundle(initial_bundle); + + for (auto* s : distributor_stripes()) { + auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); + ASSERT_TRUE(state != nullptr); + EXPECT_EQ(*initial_default, *state); + + state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); + ASSERT_TRUE(state != nullptr); + EXPECT_EQ(*initial_baseline, *state); + } + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0)); + + for (auto* s : distributor_stripes()) { + auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space()); + EXPECT_TRUE(state == nullptr); + + state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space()); + EXPECT_TRUE(state == nullptr); + } +} + +struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest { + lib::ClusterState empty_state; + std::shared_ptr initial_baseline; + std::shared_ptr initial_default; + lib::ClusterStateBundle initial_bundle; + Bucket default_bucket; + Bucket global_bucket; + + BucketDBUpdaterSnapshotTest() + : TopLevelBucketDBUpdaterTest(), + empty_state(), + initial_baseline(std::make_shared("distributor:1 storage:2 .0.s:d")), + initial_default(std::make_shared("distributor:1 storage:2 .0.s:m")), + initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}), + default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), + global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) + { + } + ~BucketDBUpdaterSnapshotTest() override; + + void SetUp() override { + TopLevelBucketDBUpdaterTest::SetUp(); + set_stale_reads_enabled(true); + }; + + // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space + uint32_t buckets_in_snapshot_matching_current_db(bool check_mutable_repo, BucketSpace bucket_space) { + uint32_t found_buckets = 0; + for (auto* s : distributor_stripes()) { + auto rs = s->bucket_db_updater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!rs.is_routable()) { + return 0; + } + auto guard = rs.steal_read_guard(); + auto& repo = check_mutable_repo ? mutable_repo(*s) : read_only_repo(*s); + for_each_bucket(repo, [&](const auto& space, const auto& entry) { + if (space == bucket_space) { + auto entries = guard->find_parents_and_self(entry.getBucketId()); + if (entries.size() == 1) { + ++found_buckets; + } + } + }); + } + return found_buckets; + } +}; + +BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; + +TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { + set_cluster_state_bundle(initial_bundle); + // State currently pending, empty initial state is active + + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(def_rs.context().has_pending_state_transition()); + EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); + + auto global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(global_rs.context().has_pending_state_transition()); + EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0)); + // State now activated, no pending + + def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); + EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(def_rs.context().has_pending_state_transition()); + + global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString()); + EXPECT_FALSE(global_rs.context().has_pending_state_transition()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + EXPECT_FALSE(activate_cluster_state_version(2)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::default_space()), n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::global_space()), n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + // We're down in state 2 and therefore do not own any buckets + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::default_space()), n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::global_space()), n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { + set_stale_reads_enabled(false); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 28831c8a661..8fae1c6d738 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -86,10 +86,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { return _distributor->_status_to_do; } - TopLevelDistributor::MetricUpdateHook distributor_metric_update_hook() { - return _distributor->_metricUpdateHook; - } - BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() { return _distributor->getBucketSpacesStats(); } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 0db345636ee..b6e9beb38ae 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -375,6 +375,11 @@ TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg) tick(); // Config is propagated upon next top-level tick } +framework::MetricUpdateHook& +TopLevelDistributorTestUtil::distributor_metric_update_hook() { + return _distributor->_metricUpdateHook; +} + BucketDatabase& TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) { assert(stripe_idx < _distributor->_stripes.size()); @@ -449,6 +454,19 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptrenableNextDistribution(); } +const lib::ClusterStateBundle& +TopLevelDistributorTestUtil::current_cluster_state_bundle() const +{ + // We assume that all stripes have the same cluster state internally, so just use the first. + assert(_distributor->_stripes[0]); + const auto& bundle = _distributor->_stripes[0]->getClusterStateBundle(); + // ... but sanity-check just to make sure... + for (size_t i = 1; i < _num_distributor_stripes; ++i) { + assert(_distributor->_stripes[i]->getClusterStateBundle() == bundle); + } + return bundle; +} + void TopLevelDistributorTestUtil::tick_distributor_and_stripes_n_times(uint32_t n) { diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index 1d9d1613920..8832f8ada6e 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -85,6 +85,8 @@ public: return _node->getClock(); } + framework::MetricUpdateHook& distributor_metric_update_hook(); + BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space); const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only @@ -135,6 +137,8 @@ public: void trigger_distribution_change(std::shared_ptr distr); + const lib::ClusterStateBundle& current_cluster_state_bundle() const; + static std::vector bucket_spaces(); protected: -- cgit v1.2.3