aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-09-13 17:37:23 +0200
committerGitHub <noreply@github.com>2021-09-13 17:37:23 +0200
commit92ad19102e936032fe61fa5a93bc88a2fc246a22 (patch)
treef584578bdcd26994e7f6d2e7ae2c19ea606080e0
parented50a96c11e4bffb3c4a2c48e725fce5ffd1a0ce (diff)
parentc24f52e097143e8643f59945e75cc0db8f2962d1 (diff)
Merge pull request #19095 from vespa-engine/vekterli/last-batch-of-bucketdbupdater-test-porting
Port final batch of BucketDBUpdater tests from legacy to top-level code paths
-rw-r--r--storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp54
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp637
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp4
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp18
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h4
5 files changed, 698 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
index fc2ad82f3a2..b871bf5841e 100644
--- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
@@ -2216,6 +2216,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing
EXPECT_EQ(expandNodeVec({0, 1}), getSendSet());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:1 storage:2");
@@ -2226,6 +2227,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_tran
EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:2 storage:1");
@@ -2236,18 +2238,21 @@ TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership
EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForDistributionChange(
"distributor:2 storage:2");
EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) {
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) {
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1)));
@@ -2255,6 +2260,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_st
EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) {
lib::ClusterState state("distributor:2 storage:2");
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1));
@@ -2267,6 +2273,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_confi
EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) {
_sender.clear();
lib::ClusterState state("distributor:2 storage:2");
@@ -2280,6 +2287,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_trans
EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* Brief reminder on test DSL for checking bucket merge operations:
*
@@ -2303,31 +2311,37 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_do
"0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) {
EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) {
EXPECT_EQ(std::string("5:0/1/2/3/t|"),
mergeBucketLists("", "0:5/1/2/3", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) {
EXPECT_EQ(std::string("5:0/1/2/3/t|"),
mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) {
EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) {
EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"),
mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) {
// This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted
// in that _all_ content nodes are considered outdated when distributor changes take place,
@@ -2343,6 +2357,7 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_
"0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
TEST_F(LegacyBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) {
std::string distConfig(getDistConfig6Nodes2Groups());
@@ -2412,6 +2427,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
getBucketDBUpdater().set_stale_reads_enabled(true);
@@ -2453,6 +2469,7 @@ TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_own
});
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) {
constexpr uint32_t n_buckets = 10;
// No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
@@ -2464,6 +2481,7 @@ TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_r
EXPECT_EQ(size_t(0), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
getBucketDBUpdater().set_stale_reads_enabled(false);
@@ -2509,6 +2527,7 @@ void LegacyBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transiti
_sender.clear();
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2529,6 +2548,7 @@ TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state
EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2541,6 +2561,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_act
EXPECT_EQ(uint64_t(0), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2555,6 +2576,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_ma
EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2569,6 +2591,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatchin
ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2585,6 +2608,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending
EXPECT_EQ(size_t(0), _sender.replies().size());
}
+// TODO STRIPE disabled benchmark tests are NOT migrated to new test suite
TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
// Need to trigger an initial edge to complete first bucket scan
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"),
@@ -2703,6 +2727,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_
fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
@@ -2728,7 +2753,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_
EXPECT_TRUE(state == nullptr);
}
-struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
+struct LegacyBucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
lib::ClusterState empty_state;
std::shared_ptr<lib::ClusterState> initial_baseline;
std::shared_ptr<lib::ClusterState> initial_default;
@@ -2736,7 +2761,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
Bucket default_bucket;
Bucket global_bucket;
- BucketDBUpdaterSnapshotTest()
+ LegacyBucketDBUpdaterSnapshotTest()
: LegacyBucketDBUpdaterTest(),
empty_state(),
initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
@@ -2747,7 +2772,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
{
}
- ~BucketDBUpdaterSnapshotTest() override;
+ ~LegacyBucketDBUpdaterSnapshotTest() override;
void SetUp() override {
LegacyBucketDBUpdaterTest::SetUp();
@@ -2774,19 +2799,22 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
}
};
-BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+LegacyBucketDBUpdaterSnapshotTest::~LegacyBucketDBUpdaterSnapshotTest() = default;
-TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
EXPECT_FALSE(rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
EXPECT_FALSE(rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
set_cluster_state_bundle(initial_bundle);
// State currently pending, empty initial state is active
@@ -2816,7 +2844,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st
EXPECT_FALSE(global_rs.context().has_pending_state_transition());
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2828,7 +2857,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_re
n_buckets);
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
"version:2 distributor:2 .0.s:d storage:4", 0, 0));
@@ -2838,7 +2868,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bu
EXPECT_FALSE(def_rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2849,7 +2880,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_onl
n_buckets);
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
getBucketDBUpdater().set_stale_reads_enabled(false);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 1a0ba8352b7..01f7d5a4f0a 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -11,13 +11,13 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/metrics/updatehook.h>
#include <vespa/storage/distributor/simpleclusterinformation.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
-#include <vespa/vespalib/util/benchmark_timer.h>
#include <sstream>
#include <iomanip>
@@ -131,7 +131,7 @@ public:
stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply);
}
- std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) {
+ std::string verify_bucket(document::BucketId id, const lib::ClusterState& state) {
BucketDatabase::Entry entry = get_bucket(id);
if (!entry.valid()) {
return vespalib::make_string("%s doesn't exist in DB", id.toString().c_str());
@@ -196,6 +196,12 @@ public:
sort_sent_messages_by_index(_sender, size_before_state);
}
+ void set_cluster_state_bundle(const lib::ClusterStateBundle& state) {
+ const size_t size_before_state = _sender.commands().size();
+ bucket_db_updater().onSetSystemState(std::make_shared<api::SetSystemStateCommand>(state));
+ sort_sent_messages_by_index(_sender, size_before_state);
+ }
+
void set_cluster_state(const vespalib::string& state_str) {
set_cluster_state(lib::ClusterState(state_str));
}
@@ -205,6 +211,14 @@ public:
std::make_shared<api::ActivateClusterStateVersionCommand>(version));
}
+ void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) {
+ ASSERT_EQ(size_t(1), _sender.replies().size());
+ auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies().back().get());
+ ASSERT_TRUE(response != nullptr);
+ ASSERT_EQ(version, response->actualVersion());
+ _sender.clear();
+ }
+
void complete_bucket_info_gathering(const lib::ClusterState& state,
size_t expected_msgs,
uint32_t bucket_count = 1,
@@ -304,6 +318,26 @@ public:
ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets));
}
+ void complete_state_transition_in_seconds(const std::string& stateStr,
+ uint32_t seconds,
+ uint32_t expectedMsgs)
+ {
+ _sender.clear();
+ lib::ClusterState state(stateStr);
+ set_cluster_state(state);
+ fake_clock().addSecondsToTime(seconds);
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expectedMsgs));
+ }
+
+ uint64_t last_transition_time_in_millis() {
+ {
+ // Force stripe metrics to be aggregated into total.
+ std::mutex l;
+ distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
+ }
+ return uint64_t(total_distributor_metrics().stateTransitionTime.getLast());
+ }
+
ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) {
lib::ClusterState baseline_cluster_state(clusterStateString);
lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state);
@@ -385,6 +419,75 @@ public:
std::vector<uint16_t> expand_node_vec(const std::vector<uint16_t>& nodes);
+ void trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs);
+
+ const DistributorBucketSpaceRepo& mutable_repo(DistributorStripe& s) const noexcept {
+ return s.getBucketSpaceRepo();
+ }
+ // Note: not calling this "immutable_repo" since it may actually be modified by the pending
+ // cluster state component (just not by operations), so it would not have the expected semantics.
+ const DistributorBucketSpaceRepo& read_only_repo(DistributorStripe& s) const noexcept {
+ return s.getReadOnlyBucketSpaceRepo();
+ }
+
+ const BucketDatabase& mutable_default_db(DistributorStripe& s) const noexcept {
+ return mutable_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ const BucketDatabase& mutable_global_db(DistributorStripe& s) const noexcept {
+ return mutable_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+ const BucketDatabase& read_only_default_db(DistributorStripe& s) const noexcept {
+ return read_only_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ const BucketDatabase& read_only_global_db(DistributorStripe& s) const noexcept {
+ return read_only_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+
+ void set_stale_reads_enabled(bool enabled) {
+ for (auto* s : distributor_stripes()) {
+ s->bucket_db_updater().set_stale_reads_enabled(enabled);
+ }
+ bucket_db_updater().set_stale_reads_enabled(enabled);
+ }
+
+ size_t mutable_default_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += mutable_default_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t mutable_global_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += mutable_global_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t read_only_default_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += read_only_default_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t read_only_global_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += read_only_global_db(*s).size();
+ }
+ return total;
+ }
+
};
TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest()
@@ -502,6 +605,30 @@ TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes)
return get_node_list(nodes, _bucket_spaces.size());
}
+void
+TopLevelBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs)
+{
+ lib::ClusterState initial_state(initial_state_str);
+ set_cluster_state(initial_state);
+ ASSERT_EQ(message_count(initial_expected_msgs), _sender.commands().size());
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(
+ initial_state, message_count(initial_expected_msgs), initial_buckets));
+ _sender.clear();
+
+ lib::ClusterState pending_state(pending_state_str); // Ownership change
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true));
+ ASSERT_EQ(message_count(pending_expected_msgs), _sender.commands().size());
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(
+ pending_state, message_count(pending_expected_msgs), pending_buckets));
+ _sender.clear();
+}
+
TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) {
set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
@@ -660,8 +787,8 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) {
for (int i=0; i<10; i++) {
EXPECT_EQ("",
- verifyBucket(document::BucketId(16, i),
- lib::ClusterState("distributor:1 storage:1")));
+ verify_bucket(document::BucketId(16, i),
+ lib::ClusterState("distributor:1 storage:1")));
}
// Set system state should now be passed on
@@ -2033,4 +2160,506 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needi
EXPECT_EQ(expand_node_vec({0, 1}), get_send_set());
}
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:1 storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+
+ fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 .1.s:d storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 storage:1");
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
+
+ fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d");
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_distribution_change("distributor:2 storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) {
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2)));
+
+ EXPECT_EQ(uint64_t(5000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) {
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2)));
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:3", 3, message_count(1)));
+
+ EXPECT_EQ(uint64_t(3000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) {
+ lib::ClusterState state("distributor:2 storage:2");
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, message_count(2), 1));
+
+ _sender.clear();
+ set_distribution(dist_config_3_nodes_in_1_group());
+ fake_clock().addSecondsToTime(4);
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, message_count(2)));
+ EXPECT_EQ(uint64_t(4000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) {
+ _sender.clear();
+ set_cluster_state("version:1 distributor:2 storage:2");
+ fake_clock().addSecondsToTime(5);
+ // Pre-empted with new state here, which will push out the old pending
+ // state and replace it with a new one. We should still count the time
+ // used processing the old state.
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("version:2 distributor:2 storage:3", 3, message_count(3)));
+
+ EXPECT_EQ(uint64_t(8000), last_transition_time_in_millis());
+}
+
+/*
+ * Brief reminder on test DSL for checking bucket merge operations:
+ *
+ * merge_bucket_lists() takes as input strings of the format
+ * <node>:<raw bucket id>/<checksum>/<num docs>/<doc size>|<node>:
+ * and returns a string describing the bucket DB post-merge with the format
+ * <raw bucket id>:<node>/<checksum>/<num docs>/<doc size>,<node>:....|<raw bucket id>:....
+ *
+ * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake.
+ */
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) {
+ // Replacing bucket information for content node 0 should not mark existing
+ // untrusted replica as trusted as a side effect.
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:3 .0.s:i"),
+ "0:5/0/0/0|1:5/7/8/9",
+ lib::ClusterState("distributor:1 storage:3 .0.s:u"),
+ "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists("", "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) {
+ EXPECT_EQ("5:0/1/2/3/t|",
+ merge_bucket_lists("", "0:5/1/2/3", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) {
+ EXPECT_EQ("5:0/1/2/3/t|",
+ merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) {
+ EXPECT_EQ("5:1/2/3/4/u,0/1/2/3/t|",
+ merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) {
+ // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted
+ // in that _all_ content nodes are considered outdated when distributor changes take place,
+ // and therefore a slightly different code path is taken. In particular, bucket info for
+ // outdated nodes gets removed before possibly being re-added (if present in the bucket info
+ // response).
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:2 storage:3"),
+ "0:5/1/2/3|1:5/7/8/9",
+ lib::ClusterState("distributor:1 storage:3"),
+ "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
+TEST_F(TopLevelBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) {
+ set_distribution(dist_config_6_nodes_across_2_groups());
+
+ const vespalib::string current_hash = "(0d*|*(0;0;1;2)(1;3;4;5))";
+ const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))";
+
+ set_cluster_state("distributor:6 storage:6");
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+
+ api::RequestBucketInfoCommand* global_req = nullptr;
+ for (auto& cmd : _sender.commands()) {
+ auto& req_cmd = dynamic_cast<api::RequestBucketInfoCommand&>(*cmd);
+ if (req_cmd.getBucketSpace() == document::FixedBucketSpaces::global_space()) {
+ global_req = &req_cmd;
+ break;
+ }
+ }
+ ASSERT_TRUE(global_req != nullptr);
+ ASSERT_EQ(current_hash, global_req->getDistributionHash());
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req);
+ reply->setResult(api::ReturnCode::REJECTED);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+
+ fake_clock().addSecondsToTime(10);
+ bucket_db_updater().resend_delayed_messages();
+
+ // Should now be a resent request with legacy distribution hash
+ ASSERT_EQ(message_count(6) + 1, _sender.commands().size());
+ auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back());
+ ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash());
+
+ // Now if we reject it _again_ we should cycle back to the current hash
+ // in case it wasn't a hash-based rejection after all. And the circle of life continues.
+ reply = std::make_shared<api::RequestBucketInfoReply>(legacy_req);
+ reply->setResult(api::ReturnCode::REJECTED);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+
+ fake_clock().addSecondsToTime(10);
+ bucket_db_updater().resend_delayed_messages();
+
+ ASSERT_EQ(message_count(6) + 2, _sender.commands().size());
+ auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back());
+ ASSERT_EQ(current_hash, new_current_req.getDistributionHash());
+}
+
+namespace {
+
+template <typename Func>
+void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) {
+ BucketId last(0);
+ auto e = db.getNext(last);
+ while (e.valid()) {
+ f(space, e);
+ e = db.getNext(e.getBucketId());
+ }
+}
+
+template <typename Func>
+void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
+ for (const auto& space : repo) {
+ for_each_bucket(space.second->getBucketDatabase(), space.first, f);
+ }
+}
+
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
+ set_stale_reads_enabled(true);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ ASSERT_EQ(message_count(4), _sender.commands().size());
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets));
+ _sender.clear();
+
+ EXPECT_EQ(n_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(n_buckets, mutable_global_dbs_size());
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+
+ lib::ClusterState pending_state("distributor:2 storage:4");
+
+ std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state;
+ for (auto* s : distributor_stripes()) {
+ for_each_bucket(mutable_repo(*s), [&](const auto& space, const auto& entry) {
+ if (!distributor_bucket_space(entry.getBucketId()).owns_bucket_in_state(pending_state, entry.getBucketId())) {
+ buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId()));
+ }
+ });
+ }
+ EXPECT_FALSE(buckets_not_owned_in_pending_state.empty());
+
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
+
+ const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces
+ const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space;
+ EXPECT_EQ(expected_mutable_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(expected_mutable_buckets, mutable_global_dbs_size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_default_dbs_size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_global_dbs_size());
+
+ for (auto* s : distributor_stripes()) {
+ for_each_bucket(read_only_repo(*s), [&](const auto& space, const auto& entry) {
+ EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
+ != buckets_not_owned_in_pending_state.end());
+ });
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) {
+ constexpr uint32_t n_buckets = 10;
+ // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
+ // cause some buckets to be entirely unavailable.
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0);
+
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
+ set_stale_reads_enabled(false);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ ASSERT_EQ(message_count(4), _sender.commands().size());
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets));
+ _sender.clear();
+
+ // Nothing in read-only DB after first bulk load of buckets.
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+
+ set_cluster_state("distributor:2 storage:4");
+ // No buckets should be moved into read only db after ownership changes.
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+
+ // Version should not be switched over yet
+ EXPECT_EQ(1u, current_cluster_state_bundle().getVersion());
+
+ EXPECT_EQ(0u, mutable_default_dbs_size());
+ EXPECT_EQ(0u, mutable_global_dbs_size());
+
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ EXPECT_EQ(2u, current_cluster_state_bundle().getVersion());
+ EXPECT_EQ(n_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(n_buckets, mutable_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0));
+
+ // State not yet activated, so read-only DBs have got all the buckets we used to have.
+ EXPECT_EQ(0u, mutable_default_dbs_size());
+ EXPECT_EQ(0u, mutable_global_dbs_size());
+ EXPECT_EQ(n_buckets, read_only_default_dbs_size());
+ EXPECT_EQ(n_buckets, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
+ "version:5 distributor:2 storage:4", n_buckets, 0));
+
+ EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
+
+ EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ // Activate version 2; no pending cluster state after this.
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager.
+ // Note: state manager is not modelled in this test, so we just check that the message handler returns
+ // false (meaning "didn't take message ownership") and there's no auto-generated reply.
+ EXPECT_FALSE(activate_cluster_state_version(3));
+ EXPECT_EQ(size_t(0), _sender.replies().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
+ auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
+ auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
+
+ lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}});
+ set_cluster_state_bundle(initial_bundle);
+
+ for (auto* s : distributor_stripes()) {
+ auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_default, *state);
+
+ state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_baseline, *state);
+ }
+
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0));
+
+ for (auto* s : distributor_stripes()) {
+ auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ EXPECT_TRUE(state == nullptr);
+
+ state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ EXPECT_TRUE(state == nullptr);
+ }
+}
+
+struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest {
+ lib::ClusterState empty_state;
+ std::shared_ptr<lib::ClusterState> initial_baseline;
+ std::shared_ptr<lib::ClusterState> initial_default;
+ lib::ClusterStateBundle initial_bundle;
+ Bucket default_bucket;
+ Bucket global_bucket;
+
+ BucketDBUpdaterSnapshotTest()
+ : TopLevelBucketDBUpdaterTest(),
+ empty_state(),
+ initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
+ initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")),
+ initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}}),
+ default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)),
+ global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
+ {
+ }
+ ~BucketDBUpdaterSnapshotTest() override;
+
+ void SetUp() override {
+ TopLevelBucketDBUpdaterTest::SetUp();
+ set_stale_reads_enabled(true);
+ };
+
+ // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
+ uint32_t buckets_in_snapshot_matching_current_db(bool check_mutable_repo, BucketSpace bucket_space) {
+ uint32_t found_buckets = 0;
+ for (auto* s : distributor_stripes()) {
+ auto rs = s->bucket_db_updater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!rs.is_routable()) {
+ return 0;
+ }
+ auto guard = rs.steal_read_guard();
+ auto& repo = check_mutable_repo ? mutable_repo(*s) : read_only_repo(*s);
+ for_each_bucket(repo, [&](const auto& space, const auto& entry) {
+ if (space == bucket_space) {
+ auto entries = guard->find_parents_and_self(entry.getBucketId());
+ if (entries.size() == 1) {
+ ++found_buckets;
+ }
+ }
+ });
+ }
+ return found_buckets;
+ }
+};
+
+BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+
+TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+ set_cluster_state_bundle(initial_bundle);
+ // State currently pending, empty initial state is active
+
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(def_rs.context().has_pending_state_transition());
+ EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
+
+ auto global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(global_rs.context().has_pending_state_transition());
+ EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
+
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0));
+ // State now activated, no pending
+
+ def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(def_rs.context().has_pending_state_transition());
+
+ global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(global_rs.context().has_pending_state_transition());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::default_space()), n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::global_space()), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ // We're down in state 2 and therefore do not own any buckets
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::default_space()), n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::global_space()), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+ set_stale_reads_enabled(false);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index 28831c8a661..8fae1c6d738 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -86,10 +86,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return _distributor->_status_to_do;
}
- TopLevelDistributor::MetricUpdateHook distributor_metric_update_hook() {
- return _distributor->_metricUpdateHook;
- }
-
BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() {
return _distributor->getBucketSpacesStats();
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index 0db345636ee..b6e9beb38ae 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -375,6 +375,11 @@ TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg)
tick(); // Config is propagated upon next top-level tick
}
+framework::MetricUpdateHook&
+TopLevelDistributorTestUtil::distributor_metric_update_hook() {
+ return _distributor->_metricUpdateHook;
+}
+
BucketDatabase&
TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) {
assert(stripe_idx < _distributor->_stripes.size());
@@ -449,6 +454,19 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di
_distributor->enableNextDistribution();
}
+const lib::ClusterStateBundle&
+TopLevelDistributorTestUtil::current_cluster_state_bundle() const
+{
+ // We assume that all stripes have the same cluster state internally, so just use the first.
+ assert(_distributor->_stripes[0]);
+ const auto& bundle = _distributor->_stripes[0]->getClusterStateBundle();
+ // ... but sanity-check just to make sure...
+ for (size_t i = 1; i < _num_distributor_stripes; ++i) {
+ assert(_distributor->_stripes[i]->getClusterStateBundle() == bundle);
+ }
+ return bundle;
+}
+
void
TopLevelDistributorTestUtil::tick_distributor_and_stripes_n_times(uint32_t n)
{
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
index 1d9d1613920..8832f8ada6e 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.h
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -85,6 +85,8 @@ public:
return _node->getClock();
}
+ framework::MetricUpdateHook& distributor_metric_update_hook();
+
BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only
BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space);
const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only
@@ -135,6 +137,8 @@ public:
void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr);
+ const lib::ClusterStateBundle& current_cluster_state_bundle() const;
+
static std::vector<document::BucketSpace> bucket_spaces();
protected: