summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/bucketdbupdatertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp155
1 files changed, 149 insertions, 6 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 321b0cc3bba..30b5256cf13 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -2,11 +2,10 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
+#include <vespa/storage/distributor/cluster_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
-#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/document/test/make_document_bucket.h>
@@ -124,7 +123,7 @@ public:
createLinks();
_bucketSpaces = getBucketSpaces();
// Disable deferred activation by default (at least for now) to avoid breaking the entire world.
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
};
void TearDown() override {
@@ -2415,7 +2414,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
+ getBucketDBUpdater().set_stale_reads_enabled(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2468,7 +2467,7 @@ TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_on
}
TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
+ getBucketDBUpdater().set_stale_reads_enabled(false);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
@@ -2497,7 +2496,6 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
uint32_t pending_buckets,
uint32_t pending_expected_msgs)
{
- getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state(initial_state_str);
setSystemState(initial_state);
ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands().size());
@@ -2514,6 +2512,7 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
}
TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2533,6 +2532,7 @@ TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until
}
TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2544,6 +2544,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated
}
TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2557,6 +2558,7 @@ TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_d
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
@@ -2570,6 +2572,7 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_vers
}
TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
+ getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2727,4 +2730,144 @@ TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_s
EXPECT_TRUE(state == nullptr);
}
+struct BucketDBUpdaterSnapshotTest : BucketDBUpdaterTest {
+ lib::ClusterState empty_state;
+ std::shared_ptr<lib::ClusterState> initial_baseline;
+ std::shared_ptr<lib::ClusterState> initial_default;
+ lib::ClusterStateBundle initial_bundle;
+ Bucket default_bucket;
+ Bucket global_bucket;
+
+ BucketDBUpdaterSnapshotTest()
+ : BucketDBUpdaterTest(),
+ empty_state(),
+ initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
+ initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")),
+ initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}}),
+ default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)),
+ global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
+ {
+ }
+ ~BucketDBUpdaterSnapshotTest() override;
+
+ void SetUp() override {
+ BucketDBUpdaterTest::SetUp();
+ getBucketDBUpdater().set_stale_reads_enabled(true);
+ };
+
+ // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
+ uint32_t buckets_in_snapshot_matching_current_db(DistributorBucketSpaceRepo& repo, BucketSpace bucket_space) {
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!def_rs.is_routable()) {
+ return 0;
+ }
+ auto guard = def_rs.steal_read_guard();
+ uint32_t found_buckets = 0;
+ for_each_bucket(repo, [&](const auto& space, const auto& entry) {
+ if (space == bucket_space) {
+ std::vector<BucketDatabase::Entry> entries;
+ guard->find_parents_and_self(entry.getBucketId(), entries);
+ if (entries.size() == 1) {
+ ++found_buckets;
+ }
+ }
+ });
+ return found_buckets;
+ }
+};
+
+BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+
+TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+ set_cluster_state_bundle(initial_bundle);
+ // State currently pending, empty initial state is active
+
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(def_rs.context().has_pending_state_transition());
+ EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
+
+ auto global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(global_rs.context().has_pending_state_transition());
+ EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
+
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(*initial_baseline, messageCount(1), 0));
+ // State now activated, no pending
+
+ def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_EQ(def_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_FALSE(def_rs.context().has_pending_state_transition());
+
+ global_rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().baseline_active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_FALSE(global_rs.context().has_pending_state_transition());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(mutable_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ // We're down in state 2 and therefore do not own any buckets
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::default_space()),
+ n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(read_only_repo(), FixedBucketSpaces::global_space()),
+ n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+ getBucketDBUpdater().set_stale_reads_enabled(false);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ auto def_rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+
+/*
+ * TODO test
+ * - is_routable for pending/no pending
+ * - explicit guard (how?)
+ * - snapshots for pending/no pending
+ */
+
}