diff options
Diffstat (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/bucketdbupdatertest.cpp | 155 |
1 files changed, 149 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 321b0cc3bba..30b5256cf13 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -2,11 +2,10 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storage/distributor/cluster_distribution_context.h> #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> #include <vespa/storage/distributor/outdated_nodes_map.h> -#include <vespa/vespalib/io/fileutil.h> -#include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> @@ -124,7 +123,7 @@ public: createLinks(); _bucketSpaces = getBucketSpaces(); // Disable deferred activation by default (at least for now) to avoid breaking the entire world. - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); }; void TearDown() override { @@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + getBucketDBUpdater().set_stale_reads_enabled(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on } TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + getBucketDBUpdater().set_stale_reads_enabled(false); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity @@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( uint32_t pending_buckets, uint32_t pending_expected_msgs) { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state(initial_state_str); setSystemState(initial_state); ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size()); @@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( } TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until } TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated } TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, @@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, @@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers } TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { + getBucketDBUpdater().set_stale_reads_enabled(true); constexpr uint32_t n_buckets = 10; ASSERT_NO_FATAL_FAILURE( trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, @@ -2727,4 +2730,144 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s EXPECT_TRUE(state == nullptr); } +struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest { + lib::ClusterState empty_state; + std::shared_ptr<lib::ClusterState> initial_baseline; + std::shared_ptr<lib::ClusterState> initial_default; + lib::ClusterStateBundle initial_bundle; + Bucket default_bucket; + Bucket global_bucket; + + BucketDBUpdaterSnapshotTest() + : BucketDBUpdaterTest(), + empty_state(), + initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")), + initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")), + initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default}, + {FixedBucketSpaces::global_space(), initial_baseline}}), + default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)), + global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234)) + { + } + ~BucketDBUpdaterSnapshotTest() override; + + void SetUp() override { + BucketDBUpdaterTest::SetUp(); + getBucketDBUpdater().set_stale_reads_enabled(true); + }; + + // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space + uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) { + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234))); + if (!def_rs.is_routable()) { + return 0; + } + auto guard = def_rs.steal_read_guard(); + uint32_t found_buckets = 0; + for_each_bucket(repo, [&](const auto& space, const auto& entry) { + if (space == bucket_space) { + std::vector<BucketDatabase::Entry> entries; + guard->find_parents_and_self(entry.getBucketId(), entries); + if (entries.size() == 1) { + ++found_buckets; + } + } + }); + return found_buckets; + } +}; + +BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default; + +TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) { + auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_FALSE(rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) { + set_cluster_state_bundle(initial_bundle); + // State currently pending, empty initial state is active + + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(def_rs.context().has_pending_state_transition()); + EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString()); + + auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString()); + EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString()); + ASSERT_TRUE(global_rs.context().has_pending_state_transition()); + EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString()); + + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0)); + // State now activated, no pending + + def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString()); + EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_FALSE(def_rs.context().has_pending_state_transition()); + + global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket); + EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString()); + EXPECT_FALSE(global_rs.context().has_pending_state_transition()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); + EXPECT_FALSE(activate_cluster_state_version(2)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) { + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); + // We're down in state 2 and therefore do not own any buckets + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) { + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()), + n_buckets); + EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()), + n_buckets); +} + +TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) { + getBucketDBUpdater().set_stale_reads_enabled(false); + constexpr uint32_t n_buckets = 10; + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 .0.s:d storage:4", 0, 0)); + auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket); + EXPECT_FALSE(def_rs.is_routable()); +} + + +/* + * TODO test + * - is_routable for pending/no pending + * - explicit guard (how?) + * - snapshots for pending/no pending + */ + } |