aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor')
-rw-r--r--storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp84
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp1771
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp16
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp61
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h17
5 files changed, 1893 insertions, 56 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
index 6cca6df9f80..b871bf5841e 100644
--- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
@@ -1212,6 +1212,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket
}
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply) {
enableDistributorClusterState("distributor:1 storage:3");
@@ -1254,6 +1255,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply) {
dumpBucket(document::BucketId(16, 1234)));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1296,6 +1298,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) {
dumpBucket(document::BucketId(16, 1234)));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1338,7 +1341,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
dumpBucket(document::BucketId(16, 1234)));
};
-
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, flush) {
enableDistributorClusterState("distributor:1 storage:3");
_sender.clear();
@@ -1417,6 +1420,7 @@ LegacyBucketDBUpdaterTest::getSentNodesDistributionChanged(
return ost.str();
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) {
EXPECT_EQ(getNodeList({0, 1, 2}),
getSentNodes("cluster:d",
@@ -1514,6 +1518,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) {
"distributor:3 storage:3 .1.s:m"));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) {
DistributorMessageSenderStub sender;
@@ -1552,6 +1557,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) {
EXPECT_EQ(3, (int)pendingTransition.results().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down true\n";
@@ -1571,6 +1577,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
"distributor:6 .2.s:d storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
@@ -1582,6 +1589,8 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_h
"distributor:6 .2.s:d .3.s:d storage:6"));
}
+namespace {
+
void
parseInputData(const std::string& data,
uint64_t timestamp,
@@ -1656,6 +1665,8 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
}
};
+}
+
std::string
LegacyBucketDBUpdaterTest::mergeBucketLists(
const lib::ClusterState& oldState,
@@ -1724,6 +1735,7 @@ LegacyBucketDBUpdaterTest::mergeBucketLists(const std::string& existingData,
includeBucketInfo);
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) {
// Simple initializing case - ask all nodes for info
EXPECT_EQ(
@@ -1763,6 +1775,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) {
mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
// Node went from initializing to up and non-invalid bucket changed.
EXPECT_EQ(
@@ -1775,6 +1788,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1804,6 +1818,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_cur
EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1831,6 +1846,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pen
EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* If we get a distribution config change, it's important that cluster states that
* arrive after this--but _before_ the pending cluster state has finished--must trigger
@@ -1880,6 +1896,7 @@ TEST_F(LegacyBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_dis
EXPECT_EQ(size_t(0), _sender.commands().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
@@ -1929,6 +1946,7 @@ std::unique_ptr<BucketDatabase::EntryProcessor> func_processor(Func&& f) {
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) {
setDistribution(getDistConfig3Nodes1Group());
@@ -1948,6 +1966,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_buc
}));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) {
getClock().setAbsoluteTimeInSeconds(101234);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1963,6 +1982,7 @@ TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_ti
EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) {
{
lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i");
@@ -2051,6 +2071,7 @@ LegacyBucketDBUpdaterTest::getSentNodesWithPreemption(
using nodeVec = std::vector<uint16_t>;
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* If we don't carry over the set of nodes that we need to fetch from,
* a naive comparison between the active state and the new state will
@@ -2067,6 +2088,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) {
EXPECT_EQ(
expandNodeVec({2, 3}),
@@ -2077,6 +2099,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) {
EXPECT_EQ(
expandNodeVec({2}),
@@ -2087,6 +2110,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) {
EXPECT_EQ(
nodeVec{},
@@ -2097,6 +2121,7 @@ TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_stat
"version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) {
// Even though 100 nodes are preempted, not all of these should be part
// of the request afterwards when only 6 are part of the state.
@@ -2109,6 +2134,7 @@ TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) {
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) {
lib::ClusterState stateBefore(
"version:1 distributor:6 storage:6 .1.t:1234");
@@ -2123,6 +2149,7 @@ TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_sta
EXPECT_EQ(size_t(0), _sender.commands().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest (despite being disabled)
// XXX test currently disabled since distribution config currently isn't used
// at all in order to deduce the set of nodes to send to. This might not matter
// in practice since it is assumed that the cluster state matching the new
@@ -2144,6 +2171,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to
EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/**
* Test scenario where a cluster is downsized by removing a subset of the nodes
* from the distribution configuration. The system must be able to deal with
@@ -2188,6 +2216,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing
EXPECT_EQ(expandNodeVec({0, 1}), getSendSet());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:1 storage:2");
@@ -2198,6 +2227,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distributor_set_implies_ownership_tran
EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:2 storage:1");
@@ -2208,18 +2238,21 @@ TEST_F(LegacyBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership
EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForDistributionChange(
"distributor:2 storage:2");
EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) {
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) {
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1)));
@@ -2227,6 +2260,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_reset_across_non_preempting_st
EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) {
lib::ClusterState state("distributor:2 storage:2");
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1));
@@ -2239,6 +2273,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_for_distribution_confi
EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) {
_sender.clear();
lib::ClusterState state("distributor:2 storage:2");
@@ -2252,6 +2287,7 @@ TEST_F(LegacyBucketDBUpdaterTest, transition_time_tracked_across_preempted_trans
EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* Brief reminder on test DSL for checking bucket merge operations:
*
@@ -2275,31 +2311,37 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_do
"0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) {
EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) {
EXPECT_EQ(std::string("5:0/1/2/3/t|"),
mergeBucketLists("", "0:5/1/2/3", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) {
EXPECT_EQ(std::string("5:0/1/2/3/t|"),
mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) {
EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) {
EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"),
mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) {
// This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted
// in that _all_ content nodes are considered outdated when distributor changes take place,
@@ -2315,6 +2357,7 @@ TEST_F(LegacyBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_
"0:5/1/2/3|1:5/7/8/9", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
TEST_F(LegacyBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) {
std::string distConfig(getDistConfig6Nodes2Groups());
@@ -2384,6 +2427,7 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
getBucketDBUpdater().set_stale_reads_enabled(true);
@@ -2425,6 +2469,7 @@ TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_own
});
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) {
constexpr uint32_t n_buckets = 10;
// No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
@@ -2436,6 +2481,7 @@ TEST_F(LegacyBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_r
EXPECT_EQ(size_t(0), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
getBucketDBUpdater().set_stale_reads_enabled(false);
@@ -2481,6 +2527,7 @@ void LegacyBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transiti
_sender.clear();
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2501,6 +2548,7 @@ TEST_F(LegacyBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state
EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2513,6 +2561,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_act
EXPECT_EQ(uint64_t(0), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2527,6 +2576,7 @@ TEST_F(LegacyBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_ma
EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2541,6 +2591,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_with_mismatchin
ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
getBucketDBUpdater().set_stale_reads_enabled(true);
constexpr uint32_t n_buckets = 10;
@@ -2557,6 +2608,7 @@ TEST_F(LegacyBucketDBUpdaterTest, activate_cluster_state_request_without_pending
EXPECT_EQ(size_t(0), _sender.replies().size());
}
+// TODO STRIPE disabled benchmark tests are NOT migrated to new test suite
TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
// Need to trigger an initial edge to complete first bucket scan
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"),
@@ -2675,6 +2727,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_
fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
@@ -2700,7 +2753,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_
EXPECT_TRUE(state == nullptr);
}
-struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
+struct LegacyBucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
lib::ClusterState empty_state;
std::shared_ptr<lib::ClusterState> initial_baseline;
std::shared_ptr<lib::ClusterState> initial_default;
@@ -2708,7 +2761,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
Bucket default_bucket;
Bucket global_bucket;
- BucketDBUpdaterSnapshotTest()
+ LegacyBucketDBUpdaterSnapshotTest()
: LegacyBucketDBUpdaterTest(),
empty_state(),
initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
@@ -2719,7 +2772,7 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
{
}
- ~BucketDBUpdaterSnapshotTest() override;
+ ~LegacyBucketDBUpdaterSnapshotTest() override;
void SetUp() override {
LegacyBucketDBUpdaterTest::SetUp();
@@ -2746,19 +2799,22 @@ struct BucketDBUpdaterSnapshotTest : LegacyBucketDBUpdaterTest {
}
};
-BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+LegacyBucketDBUpdaterSnapshotTest::~LegacyBucketDBUpdaterSnapshotTest() = default;
-TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
auto rs = getBucketDBUpdater().read_snapshot_for_bucket(default_bucket);
EXPECT_FALSE(rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
auto rs = getBucketDBUpdater().read_snapshot_for_bucket(global_bucket);
EXPECT_FALSE(rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
set_cluster_state_bundle(initial_bundle);
// State currently pending, empty initial state is active
@@ -2788,7 +2844,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_st
EXPECT_FALSE(global_rs.context().has_pending_state_transition());
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
@@ -2800,7 +2857,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_re
n_buckets);
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
"version:2 distributor:2 .0.s:d storage:4", 0, 0));
@@ -2810,7 +2868,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bu
EXPECT_FALSE(def_rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
@@ -2821,7 +2880,8 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_onl
n_buckets);
}
-TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
+TEST_F(LegacyBucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
getBucketDBUpdater().set_stale_reads_enabled(false);
constexpr uint32_t n_buckets = 10;
ASSERT_NO_FATAL_FAILURE(
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 70e5afaed43..01f7d5a4f0a 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -11,13 +11,13 @@
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/metrics/updatehook.h>
#include <vespa/storage/distributor/simpleclusterinformation.h>
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_stripe.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
-#include <vespa/vespalib/util/benchmark_timer.h>
#include <sstream>
#include <iomanip>
@@ -38,6 +38,8 @@ class TopLevelBucketDBUpdaterTest : public Test,
public TopLevelDistributorTestUtil
{
public:
+ using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
+
TopLevelBucketDBUpdaterTest();
~TopLevelBucketDBUpdaterTest() override;
@@ -117,7 +119,19 @@ public:
invalid_bucket_count));
}
- std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) {
+ void send_fake_reply_for_single_bucket_request(
+ const api::RequestBucketInfoCommand& rbi)
+ {
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ const document::BucketId& bucket(rbi.getBuckets()[0]);
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
+ reply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(20, 10, 12, 50, 60, true, true)));
+ stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
+ std::string verify_bucket(document::BucketId id, const lib::ClusterState& state) {
BucketDatabase::Entry entry = get_bucket(id);
if (!entry.valid()) {
return vespalib::make_string("%s doesn't exist in DB", id.toString().c_str());
@@ -182,6 +196,12 @@ public:
sort_sent_messages_by_index(_sender, size_before_state);
}
+ void set_cluster_state_bundle(const lib::ClusterStateBundle& state) {
+ const size_t size_before_state = _sender.commands().size();
+ bucket_db_updater().onSetSystemState(std::make_shared<api::SetSystemStateCommand>(state));
+ sort_sent_messages_by_index(_sender, size_before_state);
+ }
+
void set_cluster_state(const vespalib::string& state_str) {
set_cluster_state(lib::ClusterState(state_str));
}
@@ -191,6 +211,14 @@ public:
std::make_shared<api::ActivateClusterStateVersionCommand>(version));
}
+ void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) {
+ ASSERT_EQ(size_t(1), _sender.replies().size());
+ auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies().back().get());
+ ASSERT_TRUE(response != nullptr);
+ ASSERT_EQ(version, response->actualVersion());
+ _sender.clear();
+ }
+
void complete_bucket_info_gathering(const lib::ClusterState& state,
size_t expected_msgs,
uint32_t bucket_count = 1,
@@ -290,6 +318,176 @@ public:
ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets));
}
+ void complete_state_transition_in_seconds(const std::string& stateStr,
+ uint32_t seconds,
+ uint32_t expectedMsgs)
+ {
+ _sender.clear();
+ lib::ClusterState state(stateStr);
+ set_cluster_state(state);
+ fake_clock().addSecondsToTime(seconds);
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expectedMsgs));
+ }
+
+ uint64_t last_transition_time_in_millis() {
+ {
+ // Force stripe metrics to be aggregated into total.
+ std::mutex l;
+ distributor_metric_update_hook().updateMetrics(metrics::MetricLockGuard(l));
+ }
+ return uint64_t(total_distributor_metrics().stateTransitionTime.getLast());
+ }
+
+ ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) {
+ lib::ClusterState baseline_cluster_state(clusterStateString);
+ lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state);
+ auto cluster_info = std::make_shared<SimpleClusterInformation>(
+ _distributor->node_identity().node_index(),
+ cluster_state_bundle,
+ "ui");
+ enable_distributor_cluster_state(clusterStateString);
+ return cluster_info;
+ }
+
+ struct PendingClusterStateFixture {
+ DistributorMessageSenderStub sender;
+ framework::defaultimplementation::FakeClock clock;
+ std::unique_ptr<PendingClusterState> state;
+
+ PendingClusterStateFixture(
+ TopLevelBucketDBUpdaterTest& owner,
+ const std::string& old_cluster_state,
+ const std::string& new_cluster_state)
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_cluster_state));
+ auto cluster_info = owner.create_cluster_info(old_cluster_state);
+ OutdatedNodesMap outdated_nodes_map;
+ state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender,
+ owner.top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, api::Timestamp(1));
+ }
+
+ PendingClusterStateFixture(
+ TopLevelBucketDBUpdaterTest& owner,
+ const std::string& old_cluster_state)
+ {
+ auto cluster_info = owner.create_cluster_info(old_cluster_state);
+ state = PendingClusterState::createForDistributionChange(
+ clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1));
+ }
+ };
+
+ std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_state_change(
+ const std::string& oldClusterState,
+ const std::string& newClusterState)
+ {
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState);
+ }
+
+ std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_distribution_change(
+ const std::string& oldClusterState)
+ {
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
+ }
+
+ std::string get_sent_nodes(const std::string& old_cluster_state,
+ const std::string& new_cluster_state);
+
+ std::string get_sent_nodes_distribution_changed(const std::string& old_cluster_state);
+
+ std::string get_node_list(const std::vector<uint16_t>& nodes, size_t count);
+ std::string get_node_list(const std::vector<uint16_t>& nodes);
+
+ std::string merge_bucket_lists(const lib::ClusterState& old_state,
+ const std::string& existing_data,
+ const lib::ClusterState& new_state,
+ const std::string& new_data,
+ bool include_bucket_info = false);
+
+ std::string merge_bucket_lists(const std::string& existingData,
+ const std::string& newData,
+ bool includeBucketInfo = false);
+
+ std::vector<uint16_t> get_send_set() const;
+
+ std::vector<uint16_t> get_sent_nodes_with_preemption(
+ const std::string& old_cluster_state,
+ uint32_t expected_old_state_messages,
+ const std::string& preempted_cluster_state,
+ const std::string& new_cluster_state);
+
+ std::vector<uint16_t> expand_node_vec(const std::vector<uint16_t>& nodes);
+
+ void trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs);
+
+ const DistributorBucketSpaceRepo& mutable_repo(DistributorStripe& s) const noexcept {
+ return s.getBucketSpaceRepo();
+ }
+ // Note: not calling this "immutable_repo" since it may actually be modified by the pending
+ // cluster state component (just not by operations), so it would not have the expected semantics.
+ const DistributorBucketSpaceRepo& read_only_repo(DistributorStripe& s) const noexcept {
+ return s.getReadOnlyBucketSpaceRepo();
+ }
+
+ const BucketDatabase& mutable_default_db(DistributorStripe& s) const noexcept {
+ return mutable_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ const BucketDatabase& mutable_global_db(DistributorStripe& s) const noexcept {
+ return mutable_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+ const BucketDatabase& read_only_default_db(DistributorStripe& s) const noexcept {
+ return read_only_repo(s).get(FixedBucketSpaces::default_space()).getBucketDatabase();
+ }
+ const BucketDatabase& read_only_global_db(DistributorStripe& s) const noexcept {
+ return read_only_repo(s).get(FixedBucketSpaces::global_space()).getBucketDatabase();
+ }
+
+ void set_stale_reads_enabled(bool enabled) {
+ for (auto* s : distributor_stripes()) {
+ s->bucket_db_updater().set_stale_reads_enabled(enabled);
+ }
+ bucket_db_updater().set_stale_reads_enabled(enabled);
+ }
+
+ size_t mutable_default_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += mutable_default_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t mutable_global_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += mutable_global_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t read_only_default_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += read_only_default_db(*s).size();
+ }
+ return total;
+ }
+
+ size_t read_only_global_dbs_size() const {
+ size_t total = 0;
+ for (auto* s : distributor_stripes()) {
+ total += read_only_global_db(*s).size();
+ }
+ return total;
+ }
+
};
TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest()
@@ -347,6 +545,21 @@ std::string dist_config_6_nodes_across_4_groups() {
"group[3].nodes[1].index 5\n");
}
+std::string dist_config_3_nodes_in_1_group() {
+ return ("redundancy 2\n"
+ "group[2]\n"
+ "group[0].name \"invalid\"\n"
+ "group[0].index \"invalid\"\n"
+ "group[0].partitions 1|*\n"
+ "group[0].nodes[0]\n"
+ "group[1].name rack0\n"
+ "group[1].index 0\n"
+ "group[1].nodes[3]\n"
+ "group[1].nodes[0].index 0\n"
+ "group[1].nodes[1].index 1\n"
+ "group[1].nodes[2].index 2\n");
+}
+
std::string
make_string_list(std::string s, uint32_t count)
{
@@ -368,6 +581,54 @@ make_request_bucket_info_strings(uint32_t count)
}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes, size_t count)
+{
+ std::ostringstream ost;
+ bool first = true;
+ for (const auto node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ if (!first) {
+ ost << ",";
+ }
+ ost << node;
+ first = false;
+ }
+ }
+ return ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes)
+{
+ return get_node_list(nodes, _bucket_spaces.size());
+}
+
+void
+TopLevelBucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
+ vespalib::stringref initial_state_str,
+ uint32_t initial_buckets,
+ uint32_t initial_expected_msgs,
+ vespalib::stringref pending_state_str,
+ uint32_t pending_buckets,
+ uint32_t pending_expected_msgs)
+{
+ lib::ClusterState initial_state(initial_state_str);
+ set_cluster_state(initial_state);
+ ASSERT_EQ(message_count(initial_expected_msgs), _sender.commands().size());
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(
+ initial_state, message_count(initial_expected_msgs), initial_buckets));
+ _sender.clear();
+
+ lib::ClusterState pending_state(pending_state_str); // Ownership change
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true));
+ ASSERT_EQ(message_count(pending_expected_msgs), _sender.commands().size());
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(
+ pending_state, message_count(pending_expected_msgs), pending_buckets));
+ _sender.clear();
+}
+
TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) {
set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
@@ -525,13 +786,13 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) {
}
for (int i=0; i<10; i++) {
- EXPECT_EQ(std::string(""),
- verifyBucket(document::BucketId(16, i),
- lib::ClusterState("distributor:1 storage:1")));
+ EXPECT_EQ("",
+ verify_bucket(document::BucketId(16, i),
+ lib::ClusterState("distributor:1 storage:1")));
}
// Set system state should now be passed on
- EXPECT_EQ(std::string("Set system state"), _sender_down.getCommands());
+ EXPECT_EQ("Set system state", _sender_down.getCommands());
}
TEST_F(TopLevelBucketDBUpdaterTest, down_while_init) {
@@ -588,9 +849,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_down_copies_get_in_sync) {
set_cluster_state("distributor:1 storage:3 .1.s:d");
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), "
- "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), "
+ "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)",
dump_bucket(bid));
}
@@ -651,11 +912,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) {
}
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(bucketlist[0]));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(bucketlist[1]));
{
@@ -688,17 +949,17 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) {
}
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000000) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000000) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 0)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 1)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 2)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000004) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000004) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 4)));
_sender.clear();
@@ -822,9 +1083,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
}
// No database update until request bucket info replies have been received.
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
- "trusted=false,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
+ "trusted=false,active=false,ready=false)",
dump_bucket(document::BucketId(16, 1)));
EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2)));
@@ -845,11 +1106,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply);
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)",
dump_bucket(document::BucketId(16, 1)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 2)));
}
@@ -947,4 +1208,1458 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_buck
}
}
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ nodes.push_back(api::MergeBucketCommand::Node(0));
+ nodes.push_back(api::MergeBucketCommand::Node(1));
+ nodes.push_back(api::MergeBucketCommand::Node(2));
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+
+ for (uint32_t i = 0; i < 3; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
+ "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ std::vector<api::MergeBucketCommand::Node> nodes;
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ set_cluster_state(lib::ClusterState("distributor:1 storage:2"));
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(2), _sender.commands().size());
+
+ for (uint32_t i = 0; i < 2; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(
+ bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ std::vector<api::MergeBucketCommand::Node> nodes;
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+
+ set_cluster_state(lib::ClusterState("distributor:1 storage:2"));
+
+ for (uint32_t i = 0; i < 3; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(
+ bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, flush) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ _sender.clear();
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+ ASSERT_EQ(size_t(0), _sender_down.replies().size());
+
+ stripe_of_bucket(bucket_id).bucket_db_updater().flush();
+ // Flushing should drop all merge bucket replies
+ EXPECT_EQ(size_t(0), _sender_down.commands().size());
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_sent_nodes(const std::string& old_cluster_state,
+ const std::string& new_cluster_state)
+{
+ auto fixture = create_pending_state_fixture_for_state_change(old_cluster_state, new_cluster_state);
+ sort_sent_messages_by_index(fixture->sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < fixture->sender.commands().size(); i++) {
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.command(i));
+
+ if (i > 0) {
+ ost << ",";
+ }
+
+ ost << req.getAddress()->getIndex();
+ }
+
+ return ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::string& old_cluster_state)
+{
+ DistributorMessageSenderStub sender;
+
+ framework::defaultimplementation::FakeClock clock;
+ auto cluster_info = create_cluster_info(old_cluster_state);
+ std::unique_ptr<PendingClusterState> state(
+ PendingClusterState::createForDistributionChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1)));
+
+ sort_sent_messages_by_index(sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < sender.commands().size(); i++) {
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.command(i));
+
+ if (i > 0) {
+ ost << ",";
+ }
+
+ ost << req.getAddress()->getIndex();
+ }
+
+ return ost.str();
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_send_messages) {
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("cluster:d",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1}),
+ get_sent_nodes("cluster:d",
+ "distributor:1 storage:3 .2.s:m"));
+
+ EXPECT_EQ(get_node_list({2}),
+ get_sent_nodes("distributor:1 storage:2",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ(get_node_list({2, 3, 4, 5}),
+ get_sent_nodes("distributor:1 storage:2",
+ "distributor:1 storage:6"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2, 3}),
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:4 .2.s:d storage:4"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:4 .0.s:d storage:4"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:4 storage:3"));
+
+ EXPECT_EQ(get_node_list({2}),
+ get_sent_nodes("distributor:3 storage:3 .2.s:i",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({1}),
+ get_sent_nodes("distributor:3 storage:3 .1.s:d",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({1, 2, 4}),
+ get_sent_nodes("distributor:3 storage:4 .1.s:d .2.s:i",
+ "distributor:3 storage:5"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "cluster:d"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "cluster:d distributor:1 storage:6"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 .2.s:m storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes_distribution_changed("distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1}),
+ get_sent_nodes("distributor:10 storage:2",
+ "distributor:10 .1.s:d storage:2"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:2 storage:2",
+ "distributor:3 .2.s:i storage:2"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 .2.s:s storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 .2.s:s storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ(get_node_list({1}),
+ get_sent_nodes("distributor:3 storage:3 .1.s:m",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 storage:3 .1.s:m"));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) {
+ DistributorMessageSenderStub sender;
+
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("distributor:1 storage:3"));
+
+ framework::defaultimplementation::FakeClock clock;
+ auto cluster_info = create_cluster_info("cluster:d");
+ OutdatedNodesMap outdated_nodes_map;
+ std::unique_ptr<PendingClusterState> state(
+ PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, api::Timestamp(1)));
+
+ ASSERT_EQ(message_count(3), sender.commands().size());
+
+ sort_sent_messages_by_index(sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < sender.commands().size(); i++) {
+ auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.command(i).get());
+ ASSERT_TRUE(req != nullptr);
+
+ auto rep = std::make_shared<RequestBucketInfoReply>(*req);
+
+ rep->getBucketInfo().push_back(
+ RequestBucketInfoReply::Entry(
+ document::BucketId(16, i),
+ api::BucketInfo(i, i, i, i, i)));
+
+ ASSERT_TRUE(state->onRequestBucketInfoReply(rep));
+ ASSERT_EQ((i == (sender.commands().size() - 1)), state->done());
+ }
+
+ auto& pending_transition = state->getPendingBucketSpaceDbTransition(makeBucketSpace());
+ EXPECT_EQ(3u, pending_transition.results().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
+ std::string config = dist_config_6_nodes_across_4_groups();
+ config += "distributor_auto_ownership_transfer_on_whole_group_down true\n";
+ set_distribution(config);
+
+ // Group config has nodes {0, 1}, {2, 3}, {4, 5}
+ // We're node index 0.
+
+ // Entire group 1 goes down. Must refetch from all nodes.
+ EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
+
+ // But don't fetch if not the entire group is down.
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) {
+ std::string config = dist_config_6_nodes_across_4_groups();
+ config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
+ set_distribution(config);
+
+ // Group is down, but config says to not do anything about it.
+ EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1),
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
+}
+
+
+namespace {
+
+void
+parse_input_data(const std::string& data,
+ uint64_t timestamp,
+ PendingClusterState& state,
+ bool include_bucket_info)
+{
+ vespalib::StringTokenizer tokenizer(data, "|");
+ for (uint32_t i = 0; i < tokenizer.size(); i++) {
+ vespalib::StringTokenizer tok2(tokenizer[i], ":");
+
+ uint16_t node = atoi(tok2[0].data());
+
+ state.setNodeReplied(node);
+ auto& pending_transition = state.getPendingBucketSpaceDbTransition(makeBucketSpace());
+
+ vespalib::StringTokenizer tok3(tok2[1], ",");
+ for (uint32_t j = 0; j < tok3.size(); j++) {
+ if (include_bucket_info) {
+ vespalib::StringTokenizer tok4(tok3[j], "/");
+
+ pending_transition.addNodeInfo(
+ document::BucketId(16, atoi(tok4[0].data())),
+ BucketCopy(
+ timestamp,
+ node,
+ api::BucketInfo(
+ atoi(tok4[1].data()),
+ atoi(tok4[2].data()),
+ atoi(tok4[3].data()),
+ atoi(tok4[2].data()),
+ atoi(tok4[3].data()))));
+ } else {
+ pending_transition.addNodeInfo(
+ document::BucketId(16, atoi(tok3[j].data())),
+ BucketCopy(timestamp,
+ node,
+ api::BucketInfo(3, 3, 3, 3, 3)));
+ }
+ }
+ }
+}
+
+struct BucketDumper : public BucketDatabase::EntryProcessor
+{
+ std::ostringstream ost;
+ bool _include_bucket_info;
+
+ explicit BucketDumper(bool include_bucket_info)
+ : _include_bucket_info(include_bucket_info)
+ {
+ }
+
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
+ document::BucketId bucket_id(e.getBucketId());
+
+ ost << uint32_t(bucket_id.getRawId()) << ":";
+ for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
+ if (i > 0) {
+ ost << ",";
+ }
+ const BucketCopy& copy(e->getNodeRef(i));
+ ost << copy.getNode();
+ if (_include_bucket_info) {
+ ost << "/" << copy.getChecksum()
+ << "/" << copy.getDocumentCount()
+ << "/" << copy.getTotalDocumentSize()
+ << "/" << (copy.trusted() ? "t" : "u");
+ }
+ }
+ ost << "|";
+ return true;
+ }
+};
+
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::merge_bucket_lists(
+ const lib::ClusterState& old_state,
+ const std::string& existing_data,
+ const lib::ClusterState& new_state,
+ const std::string& new_data,
+ bool include_bucket_info)
+{
+ framework::defaultimplementation::FakeClock clock;
+ framework::MilliSecTimer timer(clock);
+
+ DistributorMessageSenderStub sender;
+ OutdatedNodesMap outdated_nodes_map;
+
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(old_state);
+ api::Timestamp before_time(1);
+ auto cluster_info = create_cluster_info("cluster:d");
+
+ auto state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, before_time);
+
+ parse_input_data(existing_data, before_time, *state, include_bucket_info);
+ auto guard = acquire_stripe_guard();
+ state->merge_into_bucket_databases(*guard);
+ }
+
+ BucketDumper dumper_tmp(true);
+ for (auto* s : distributor_stripes()) {
+ auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.forEach(dumper_tmp);
+ }
+
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_state));
+ api::Timestamp after_time(2);
+ auto cluster_info = create_cluster_info(old_state.toString());
+
+ auto state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, after_time);
+
+ parse_input_data(new_data, after_time, *state, include_bucket_info);
+ auto guard = acquire_stripe_guard();
+ state->merge_into_bucket_databases(*guard);
+ }
+
+ BucketDumper dumper(include_bucket_info);
+ for (auto* s : distributor_stripes()) {
+ auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.forEach(dumper);
+ db.clear();
+ }
+ return dumper.ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::merge_bucket_lists(const std::string& existing_data,
+ const std::string& new_data,
+ bool include_bucket_info)
+{
+ return merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:3"),
+ existing_data,
+ lib::ClusterState("distributor:1 storage:3"),
+ new_data,
+ include_bucket_info);
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge) {
+ // Result is on the form: [bucket w/o count bits]:[node indexes]|..
+ // Input is on the form: [node]:[bucket w/o count bits]|...
+
+ // Simple initializing case - ask all nodes for info
+ EXPECT_EQ("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|",
+ merge_bucket_lists(
+ "",
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6"));
+
+ // New node came up
+ EXPECT_EQ("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|",
+ merge_bucket_lists(
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
+ "3:1,3,5,6"));
+
+ // Node came up with some buckets removed and some added
+ // Buckets that were removed should not be removed as the node
+ // didn't lose a disk.
+ EXPECT_EQ("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|",
+ merge_bucket_lists(
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
+ "0:1,2,6,8"));
+
+ // Bucket info format is "bucketid/checksum/count/size"
+ // Node went from initializing to up and invalid bucket went to empty.
+ EXPECT_EQ("2:0/0/0/0/t|",
+ merge_bucket_lists(
+ "0:2/0/0/1",
+ "0:2/0/0/0",
+ true));
+
+ EXPECT_EQ("5:1/2/3/4/u,0/0/0/0/u|",
+ merge_bucket_lists("", "0:5/0/0/0|1:5/2/3/4", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
+ // Node went from initializing to up and non-invalid bucket changed.
+ EXPECT_EQ("2:0/2/3/4/t|3:0/2/4/6/t|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:1 .0.s:i"),
+ "0:2/1/2/3,3/2/4/6",
+ lib::ClusterState("distributor:1 storage:1"),
+ "0:2/2/3/4,3/2/4/6",
+ true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) {
+ document::BucketId bucket(16, 3);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+
+ stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket));
+
+ ASSERT_EQ(size_t(1), _sender.commands().size());
+ auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0));
+
+ lib::ClusterState state_after("distributor:3 storage:3");
+
+ {
+ uint32_t expected_msgs = message_count(2), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_after, expected_msgs, dummy_buckets_to_return));
+ }
+ EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state());
+
+ ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi));
+
+ EXPECT_EQ("NONEXISTING", dump_bucket(bucket));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
+ document::BucketId bucket(16, 3);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+
+ stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket));
+
+ ASSERT_EQ(size_t(1), _sender.commands().size());
+ auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0));
+
+ lib::ClusterState state_after("distributor:3 storage:3");
+ // Set, but _don't_ enable cluster state. We want it to be pending.
+ set_cluster_state(state_after);
+ EXPECT_TRUE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state());
+ EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_pending_state());
+
+ ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi));
+
+ EXPECT_EQ("NONEXISTING", dump_bucket(bucket));
+}
+
+/*
+ * If we get a distribution config change, it's important that cluster states that
+ * arrive after this--but _before_ the pending cluster state has finished--must trigger
+ * a full bucket info fetch no matter what the cluster state change was! Otherwise, we
+ * will with a high likelihood end up not getting the complete view of the buckets in
+ * the cluster.
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribution_change_pending) {
+ lib::ClusterState state_before("distributor:6 storage:6");
+ {
+ uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+ std::string distConfig(dist_config_6_nodes_across_2_groups());
+ set_distribution(distConfig);
+
+ sort_sent_messages_by_index(_sender);
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+ // Suddenly, a wild cluster state change appears! Even though this state
+ // does not in itself imply any bucket changes, it will still overwrite the
+ // pending cluster state and thus its state of pending bucket info requests.
+ set_cluster_state("distributor:6 .2.t:12345 storage:6");
+
+ ASSERT_EQ(message_count(12), _sender.commands().size());
+
+ // Send replies for first messageCount(6) (outdated requests).
+ int num_buckets = 10;
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.command(i), num_buckets));
+ }
+ // No change from these.
+ ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(1, "distributor:6 storage:6"));
+
+ // Send for current pending.
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
+ *_sender.command(i + message_count(6)),
+ num_buckets));
+ }
+ ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:6 storage:6"));
+ _sender.clear();
+
+ // No more pending global fetch; this should be a no-op state.
+ set_cluster_state("distributor:6 .3.t:12345 storage:6");
+ EXPECT_EQ(size_t(0), _sender.commands().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
+ uint32_t num_buckets = 20;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"),
+ message_count(6), num_buckets));
+ _sender.clear();
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ complete_recovery_mode_on_all_stripes();
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+
+ set_distribution(dist_config_6_nodes_across_4_groups());
+ sort_sent_messages_by_index(_sender);
+ // No replies received yet, still no recovery mode.
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+ num_buckets = 10;
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.command(i), num_buckets));
+ }
+
+ // Pending cluster state (i.e. distribution) has been enabled, which should
+ // cause recovery mode to be entered.
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ complete_recovery_mode_on_all_stripes();
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) {
+ set_distribution(dist_config_3_nodes_in_1_group());
+
+ constexpr uint32_t n_buckets = 100;
+ ASSERT_NO_FATAL_FAILURE(
+ set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), message_count(6), n_buckets));
+ _sender.clear();
+
+ // Config implies a different node set than the current cluster state, so it's crucial that
+ // DB pruning is _not_ elided. Yes, this is inherently racing with cluster state changes and
+ // should be changed to be atomic and controlled by the cluster controller instead of config.
+ // But this is where we currently are.
+ set_distribution(dist_config_6_nodes_across_2_groups());
+ for (auto* s : distributor_stripes()) {
+ const auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.acquire_read_guard()->for_each([&]([[maybe_unused]] uint64_t key, const auto& e) {
+ auto id = e.getBucketId();
+ EXPECT_TRUE(distributor_bucket_space(id).get_bucket_ownership_flags(id).owned_in_pending_state());
+ });
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) {
+ fake_clock().setAbsoluteTimeInSeconds(101234);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ // setAndEnableClusterState adds n buckets with id (16, i)
+ document::BucketId bucket(16, 0);
+ BucketDatabase::Entry e = get_bucket(bucket);
+ ASSERT_TRUE(e.valid());
+ EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) {
+ {
+ lib::ClusterState state_before("distributor:1 storage:1 .0.s:i");
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 0;
+ // This step is required to make the distributor ready for accepting
+ // the below explicit database insertion towards node 0.
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+ fake_clock().setAbsoluteTimeInSeconds(1000);
+ lib::ClusterState state("distributor:1 storage:1");
+ set_cluster_state(state);
+ ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size());
+
+ // Before replying with the bucket info, simulate the arrival of a mutation
+ // reply that alters the state of the bucket with information that will be
+ // more recent that what is returned by the bucket info. This information
+ // must not be lost when the bucket info is later merged into the database.
+ document::BucketId bucket(16, 1);
+ constexpr uint64_t insertion_timestamp = 1001ULL * 1000000;
+ api::BucketInfo wanted_info(5, 6, 7);
+ stripe_of_bucket(bucket).bucket_db_updater().operation_context().update_bucket_database(
+ makeDocumentBucket(bucket),
+ BucketCopy(insertion_timestamp, 0, wanted_info),
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
+
+ fake_clock().setAbsoluteTimeInSeconds(1002);
+ constexpr uint32_t buckets_returned = 10; // Buckets (16, 0) ... (16, 9)
+ // Return bucket information which on the timeline might originate from
+ // anywhere between [1000, 1002]. Our assumption is that any mutations
+ // taking place after t=1000 must have its reply received and processed
+ // by this distributor and timestamped strictly higher than t=1000 (modulo
+ // clock skew, of course, but that is outside the scope of this). A mutation
+ // happening before t=1000 but receiving a reply at t>1000 does not affect
+ // correctness, as this should contain the same bucket info as that
+ // contained in the full bucket reply and the DB update is thus idempotent.
+ for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), buckets_returned));
+ }
+
+ BucketDatabase::Entry e = get_bucket(bucket);
+ ASSERT_EQ(uint32_t(1), e->getNodeCount());
+ EXPECT_EQ(wanted_info, e->getNodeRef(0).getBucketInfo());
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::get_send_set() const
+{
+ std::vector<uint16_t> nodes;
+ std::transform(_sender.commands().begin(),
+ _sender.commands().end(),
+ std::back_inserter(nodes),
+ [](auto& cmd)
+ {
+ auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd));
+ return req.getAddress()->getIndex();
+ });
+ return nodes;
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::get_sent_nodes_with_preemption(
+ const std::string& old_cluster_state,
+ uint32_t expected_old_state_messages,
+ const std::string& preempted_cluster_state,
+ const std::string& new_cluster_state)
+{
+ uint32_t dummy_buckets_to_return = 10;
+ // FIXME cannot chain assertion checks in non-void function
+ set_and_enable_cluster_state(lib::ClusterState(old_cluster_state),
+ expected_old_state_messages,
+ dummy_buckets_to_return);
+
+ _sender.clear();
+
+ set_cluster_state(preempted_cluster_state);
+ _sender.clear();
+ // Do not allow the pending state to become the active state; trigger a
+ // new transition without ACKing the info requests first. This will
+ // overwrite the pending state entirely.
+ set_cluster_state(lib::ClusterState(new_cluster_state));
+ return get_send_set();
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::expand_node_vec(const std::vector<uint16_t>& nodes)
+{
+ std::vector<uint16_t> res;
+ size_t count = _bucket_spaces.size();
+ for (const auto &node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ res.push_back(node);
+ }
+ }
+ return res;
+}
+
+/*
+ * If we don't carry over the set of nodes that we need to fetch from,
+ * a naive comparison between the active state and the new state will
+ * make it appear to the distributor that nothing has changed, as any
+ * database modifications caused by intermediate states will not be
+ * accounted for (basically the ABA problem in a distributed setting).
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_over_to_next_state_fetch) {
+ EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes_with_preemption("version:1 distributor:6 storage:6",
+ message_count(6),
+ "version:2 distributor:6 .5.s:d storage:6",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) {
+ EXPECT_EQ(expand_node_vec({2, 3}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6 .2.s:d",
+ message_count(5),
+ "version:2 distributor:6 storage:6 .2.s:d .3.s:d",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) {
+ EXPECT_EQ(expand_node_vec({2}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6",
+ message_count(6),
+ "version:2 distributor:6 storage:6 .2.s:d",
+ "version:3 distributor:6 storage:6"));
+}
+
+using NodeVec = std::vector<uint16_t>;
+
+TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) {
+ EXPECT_EQ(NodeVec{},
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6 .2.s:d",
+ message_count(5),
+ "version:2 distributor:6 storage:6", // Sends to 2.
+ "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_not_part_of_new_state) {
+ // Even though 100 nodes are preempted, not all of these should be part
+ // of the request afterwards when only 6 are part of the state.
+ EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:100",
+ message_count(100),
+ "version:2 distributor:5 .4.s:d storage:100",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) {
+ lib::ClusterState state_before("version:1 distributor:6 storage:6 .1.t:1234");
+ uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 10;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+ // New cluster state that should not by itself trigger any new fetches,
+ // unless outdated node set is somehow not cleared after an enabled
+ // (completed) cluster state has been set.
+ set_cluster_state("version:3 distributor:6 storage:6");
+ EXPECT_EQ(size_t(0), _sender.commands().size());
+}
+
+// XXX test currently disabled since distribution config currently isn't used
+// at all in order to deduce the set of nodes to send to. This might not matter
+// in practice since it is assumed that the cluster state matching the new
+// distribution config will follow very shortly after the config has been
+// applied to the node. The new cluster state will then send out requests to
+// the correct node set.
+TEST_F(TopLevelBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to_available_nodes) {
+ uint32_t expected_msgs = 6, dummy_buckets_to_return = 20;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"),
+ expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+
+ // Intentionally trigger a racing config change which arrives before the
+ // new cluster state representing it.
+ set_distribution(dist_config_3_nodes_in_1_group());
+ sort_sent_messages_by_index(_sender);
+
+ EXPECT_EQ((NodeVec{0, 1, 2}), get_send_set());
+}
+
+/**
+ * Test scenario where a cluster is downsized by removing a subset of the nodes
+ * from the distribution configuration. The system must be able to deal with
+ * a scenario where the set of nodes between two cluster states across a config
+ * change may differ.
+ *
+ * See VESPA-790 for details.
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing_ownership_transfer) {
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:3 storage:3"),
+ expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+
+ // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config
+ // that does not contain node 2 while the _active_ cluster state still
+ // contains this node.
+ const char* downsize_cfg =
+ "redundancy 2\n"
+ "distributor_auto_ownership_transfer_on_whole_group_down true\n"
+ "group[2]\n"
+ "group[0].name \"invalid\"\n"
+ "group[0].index \"invalid\"\n"
+ "group[0].partitions 1|*\n"
+ "group[0].nodes[0]\n"
+ "group[1].name rack0\n"
+ "group[1].index 0\n"
+ "group[1].nodes[2]\n"
+ "group[1].nodes[0].index 0\n"
+ "group[1].nodes[1].index 1\n";
+
+ set_distribution(downsize_cfg);
+ sort_sent_messages_by_index(_sender);
+ _sender.clear();
+
+ // Attempt to apply state with {0, 1} set. This will compare the new state
+ // with the previous state, which still has node 2.
+ expected_msgs = message_count(2);
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:2 storage:2"),
+ expected_msgs, dummy_buckets_to_return));
+
+ EXPECT_EQ(expand_node_vec({0, 1}), get_send_set());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:1 storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+
+ fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 .1.s:d storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 storage:1");
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
+
+ fixture = create_pending_state_fixture_for_state_change(
+ "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d");
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) {
+ auto fixture = create_pending_state_fixture_for_distribution_change("distributor:2 storage:2");
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_single_state_change) {
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2)));
+
+ EXPECT_EQ(uint64_t(5000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) {
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:2", 5, message_count(2)));
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("distributor:2 storage:3", 3, message_count(1)));
+
+ EXPECT_EQ(uint64_t(3000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) {
+ lib::ClusterState state("distributor:2 storage:2");
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, message_count(2), 1));
+
+ _sender.clear();
+ set_distribution(dist_config_3_nodes_in_1_group());
+ fake_clock().addSecondsToTime(4);
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, message_count(2)));
+ EXPECT_EQ(uint64_t(4000), last_transition_time_in_millis());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) {
+ _sender.clear();
+ set_cluster_state("version:1 distributor:2 storage:2");
+ fake_clock().addSecondsToTime(5);
+ // Pre-empted with new state here, which will push out the old pending
+ // state and replace it with a new one. We should still count the time
+ // used processing the old state.
+ ASSERT_NO_FATAL_FAILURE(complete_state_transition_in_seconds("version:2 distributor:2 storage:3", 3, message_count(3)));
+
+ EXPECT_EQ(uint64_t(8000), last_transition_time_in_millis());
+}
+
+/*
+ * Brief reminder on test DSL for checking bucket merge operations:
+ *
+ * merge_bucket_lists() takes as input strings of the format
+ * <node>:<raw bucket id>/<checksum>/<num docs>/<doc size>|<node>:
+ * and returns a string describing the bucket DB post-merge with the format
+ * <raw bucket id>:<node>/<checksum>/<num docs>/<doc size>,<node>:....|<raw bucket id>:....
+ *
+ * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake.
+ */
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) {
+ // Replacing bucket information for content node 0 should not mark existing
+ // untrusted replica as trusted as a side effect.
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:3 .0.s:i"),
+ "0:5/0/0/0|1:5/7/8/9",
+ lib::ClusterState("distributor:1 storage:3 .0.s:u"),
+ "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists("", "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) {
+ EXPECT_EQ("5:0/1/2/3/t|",
+ merge_bucket_lists("", "0:5/1/2/3", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) {
+ EXPECT_EQ("5:0/1/2/3/t|",
+ merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) {
+ EXPECT_EQ("5:1/2/3/4/u,0/1/2/3/t|",
+ merge_bucket_lists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) {
+ // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted
+ // in that _all_ content nodes are considered outdated when distributor changes take place,
+ // and therefore a slightly different code path is taken. In particular, bucket info for
+ // outdated nodes gets removed before possibly being re-added (if present in the bucket info
+ // response).
+ EXPECT_EQ("5:1/7/8/9/u,0/1/2/3/u|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:2 storage:3"),
+ "0:5/1/2/3|1:5/7/8/9",
+ lib::ClusterState("distributor:1 storage:3"),
+ "0:5/1/2/3|1:5/7/8/9", true));
+}
+
+// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
+TEST_F(TopLevelBucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) {
+ set_distribution(dist_config_6_nodes_across_2_groups());
+
+ const vespalib::string current_hash = "(0d*|*(0;0;1;2)(1;3;4;5))";
+ const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))";
+
+ set_cluster_state("distributor:6 storage:6");
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+
+ api::RequestBucketInfoCommand* global_req = nullptr;
+ for (auto& cmd : _sender.commands()) {
+ auto& req_cmd = dynamic_cast<api::RequestBucketInfoCommand&>(*cmd);
+ if (req_cmd.getBucketSpace() == document::FixedBucketSpaces::global_space()) {
+ global_req = &req_cmd;
+ break;
+ }
+ }
+ ASSERT_TRUE(global_req != nullptr);
+ ASSERT_EQ(current_hash, global_req->getDistributionHash());
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req);
+ reply->setResult(api::ReturnCode::REJECTED);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+
+ fake_clock().addSecondsToTime(10);
+ bucket_db_updater().resend_delayed_messages();
+
+ // Should now be a resent request with legacy distribution hash
+ ASSERT_EQ(message_count(6) + 1, _sender.commands().size());
+ auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back());
+ ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash());
+
+ // Now if we reject it _again_ we should cycle back to the current hash
+ // in case it wasn't a hash-based rejection after all. And the circle of life continues.
+ reply = std::make_shared<api::RequestBucketInfoReply>(legacy_req);
+ reply->setResult(api::ReturnCode::REJECTED);
+ bucket_db_updater().onRequestBucketInfoReply(reply);
+
+ fake_clock().addSecondsToTime(10);
+ bucket_db_updater().resend_delayed_messages();
+
+ ASSERT_EQ(message_count(6) + 2, _sender.commands().size());
+ auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands().back());
+ ASSERT_EQ(current_hash, new_current_req.getDistributionHash());
+}
+
+namespace {
+
+template <typename Func>
+void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) {
+ BucketId last(0);
+ auto e = db.getNext(last);
+ while (e.valid()) {
+ f(space, e);
+ e = db.getNext(e.getBucketId());
+ }
+}
+
+template <typename Func>
+void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
+ for (const auto& space : repo) {
+ for_each_bucket(space.second->getBucketDatabase(), space.first, f);
+ }
+}
+
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
+ set_stale_reads_enabled(true);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ ASSERT_EQ(message_count(4), _sender.commands().size());
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets));
+ _sender.clear();
+
+ EXPECT_EQ(n_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(n_buckets, mutable_global_dbs_size());
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+
+ lib::ClusterState pending_state("distributor:2 storage:4");
+
+ std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state;
+ for (auto* s : distributor_stripes()) {
+ for_each_bucket(mutable_repo(*s), [&](const auto& space, const auto& entry) {
+ if (!distributor_bucket_space(entry.getBucketId()).owns_bucket_in_state(pending_state, entry.getBucketId())) {
+ buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId()));
+ }
+ });
+ }
+ EXPECT_FALSE(buckets_not_owned_in_pending_state.empty());
+
+ set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
+
+ const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces
+ const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space;
+ EXPECT_EQ(expected_mutable_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(expected_mutable_buckets, mutable_global_dbs_size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_default_dbs_size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_global_dbs_size());
+
+ for (auto* s : distributor_stripes()) {
+ for_each_bucket(read_only_repo(*s), [&](const auto& space, const auto& entry) {
+ EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
+ != buckets_not_owned_in_pending_state.end());
+ });
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) {
+ constexpr uint32_t n_buckets = 10;
+ // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
+ // cause some buckets to be entirely unavailable.
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0);
+
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
+ set_stale_reads_enabled(false);
+
+ lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
+ set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
+
+ ASSERT_EQ(message_count(4), _sender.commands().size());
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(initial_state, message_count(4), n_buckets));
+ _sender.clear();
+
+ // Nothing in read-only DB after first bulk load of buckets.
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+
+ set_cluster_state("distributor:2 storage:4");
+ // No buckets should be moved into read only db after ownership changes.
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+
+ // Version should not be switched over yet
+ EXPECT_EQ(1u, current_cluster_state_bundle().getVersion());
+
+ EXPECT_EQ(0u, mutable_default_dbs_size());
+ EXPECT_EQ(0u, mutable_global_dbs_size());
+
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ EXPECT_EQ(2u, current_cluster_state_bundle().getVersion());
+ EXPECT_EQ(n_buckets, mutable_default_dbs_size());
+ EXPECT_EQ(n_buckets, mutable_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ EXPECT_EQ(0u, read_only_default_dbs_size());
+ EXPECT_EQ(0u, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0));
+
+ // State not yet activated, so read-only DBs have got all the buckets we used to have.
+ EXPECT_EQ(0u, mutable_default_dbs_size());
+ EXPECT_EQ(0u, mutable_global_dbs_size());
+ EXPECT_EQ(n_buckets, read_only_default_dbs_size());
+ EXPECT_EQ(n_buckets, read_only_global_dbs_size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
+ "version:5 distributor:2 storage:4", n_buckets, 0));
+
+ EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
+
+ EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
+ set_stale_reads_enabled(true);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ // Activate version 2; no pending cluster state after this.
+ EXPECT_FALSE(activate_cluster_state_version(2));
+
+ // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager.
+ // Note: state manager is not modelled in this test, so we just check that the message handler returns
+ // false (meaning "didn't take message ownership") and there's no auto-generated reply.
+ EXPECT_FALSE(activate_cluster_state_version(3));
+ EXPECT_EQ(size_t(0), _sender.replies().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
+ auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
+ auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");
+
+ lib::ClusterStateBundle initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}});
+ set_cluster_state_bundle(initial_bundle);
+
+ for (auto* s : distributor_stripes()) {
+ auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_default, *state);
+
+ state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ ASSERT_TRUE(state != nullptr);
+ EXPECT_EQ(*initial_baseline, *state);
+ }
+
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0));
+
+ for (auto* s : distributor_stripes()) {
+ auto* state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::default_space());
+ EXPECT_TRUE(state == nullptr);
+
+ state = s->bucket_db_updater().pendingClusterStateOrNull(FixedBucketSpaces::global_space());
+ EXPECT_TRUE(state == nullptr);
+ }
+}
+
+struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest {
+ lib::ClusterState empty_state;
+ std::shared_ptr<lib::ClusterState> initial_baseline;
+ std::shared_ptr<lib::ClusterState> initial_default;
+ lib::ClusterStateBundle initial_bundle;
+ Bucket default_bucket;
+ Bucket global_bucket;
+
+ BucketDBUpdaterSnapshotTest()
+ : TopLevelBucketDBUpdaterTest(),
+ empty_state(),
+ initial_baseline(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d")),
+ initial_default(std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m")),
+ initial_bundle(*initial_baseline, {{FixedBucketSpaces::default_space(), initial_default},
+ {FixedBucketSpaces::global_space(), initial_baseline}}),
+ default_bucket(FixedBucketSpaces::default_space(), BucketId(16, 1234)),
+ global_bucket(FixedBucketSpaces::global_space(), BucketId(16, 1234))
+ {
+ }
+ ~BucketDBUpdaterSnapshotTest() override;
+
+ void SetUp() override {
+ TopLevelBucketDBUpdaterTest::SetUp();
+ set_stale_reads_enabled(true);
+ };
+
+ // Assumes that the distributor owns all buckets, so it may choose any arbitrary bucket in the bucket space
+ uint32_t buckets_in_snapshot_matching_current_db(bool check_mutable_repo, BucketSpace bucket_space) {
+ uint32_t found_buckets = 0;
+ for (auto* s : distributor_stripes()) {
+ auto rs = s->bucket_db_updater().read_snapshot_for_bucket(Bucket(bucket_space, BucketId(16, 1234)));
+ if (!rs.is_routable()) {
+ return 0;
+ }
+ auto guard = rs.steal_read_guard();
+ auto& repo = check_mutable_repo ? mutable_repo(*s) : read_only_repo(*s);
+ for_each_bucket(repo, [&](const auto& space, const auto& entry) {
+ if (space == bucket_space) {
+ auto entries = guard->find_parents_and_self(entry.getBucketId());
+ if (entries.size() == 1) {
+ ++found_buckets;
+ }
+ }
+ });
+ }
+ return found_buckets;
+ }
+};
+
+BucketDBUpdaterSnapshotTest::~BucketDBUpdaterSnapshotTest() = default;
+
+TEST_F(BucketDBUpdaterSnapshotTest, default_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, global_space_snapshot_prior_to_activated_state_is_non_routable) {
+ auto rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_FALSE(rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, read_snapshot_returns_appropriate_cluster_states) {
+ set_cluster_state_bundle(initial_bundle);
+ // State currently pending, empty initial state is active
+
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(def_rs.context().has_pending_state_transition());
+ EXPECT_EQ(def_rs.context().pending_cluster_state()->toString(), initial_default->toString());
+
+ auto global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), empty_state.toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), empty_state.toString());
+ ASSERT_TRUE(global_rs.context().has_pending_state_transition());
+ EXPECT_EQ(global_rs.context().pending_cluster_state()->toString(), initial_baseline->toString());
+
+ ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(*initial_baseline, message_count(1), 0));
+ // State now activated, no pending
+
+ def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_EQ(def_rs.context().active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_EQ(def_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(def_rs.context().has_pending_state_transition());
+
+ global_rs = stripe_of_bucket(global_bucket).bucket_db_updater().read_snapshot_for_bucket(global_bucket);
+ EXPECT_EQ(global_rs.context().active_cluster_state()->toString(), initial_baseline->toString());
+ EXPECT_EQ(global_rs.context().default_active_cluster_state()->toString(), initial_default->toString());
+ EXPECT_FALSE(global_rs.context().has_pending_state_transition());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_no_pending_state_transition_returns_mutable_db_guard) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::default_space()), n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(true, FixedBucketSpaces::global_space()), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_returns_unroutable_for_non_owned_bucket_in_current_state) {
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
+ // We're down in state 2 and therefore do not own any buckets
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_with_pending_state_returns_read_only_guard_for_bucket_only_owned_in_current_state) {
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::default_space()), n_buckets);
+ EXPECT_EQ(buckets_in_snapshot_matching_current_db(false, FixedBucketSpaces::global_space()), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabled_and_bucket_not_owned_in_pending_state) {
+ set_stale_reads_enabled(false);
+ constexpr uint32_t n_buckets = 10;
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 .0.s:d storage:4", 0, 0));
+ auto def_rs = stripe_of_bucket(default_bucket).bucket_db_updater().read_snapshot_for_bucket(default_bucket);
+ EXPECT_FALSE(def_rs.is_routable());
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index d8df36e53b2..8fae1c6d738 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -70,18 +70,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return posted_msgs.str();
}
- void tick_distributor_and_stripes_n_times(uint32_t n) {
- for (uint32_t i = 0; i < n; ++i) {
- tick(false);
- }
- }
-
- void tick_top_level_distributor_n_times(uint32_t n) {
- for (uint32_t i = 0; i < n; ++i) {
- tick(true);
- }
- }
-
StatusReporterDelegate& distributor_status_delegate() {
return _distributor->_distributorStatusDelegate;
}
@@ -98,10 +86,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return _distributor->_status_to_do;
}
- TopLevelDistributor::MetricUpdateHook distributor_metric_update_hook() {
- return _distributor->_metricUpdateHook;
- }
-
BucketSpacesStatsProvider::PerNodeBucketSpacesStats distributor_bucket_spaces_stats() {
return _distributor->getBucketSpacesStats();
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index c4173f5e8ff..b6e9beb38ae 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -265,6 +265,25 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const
return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId);
}
+DistributorBucketSpaceRepo&
+TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept
+{
+ return _distributor->_component.bucket_space_repo();
+}
+
+const DistributorBucketSpaceRepo&
+TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept
+{
+ return _distributor->_component.bucket_space_repo();
+}
+
+std::unique_ptr<StripeAccessGuard>
+TopLevelDistributorTestUtil::acquire_stripe_guard()
+{
+ // Note: this won't actually interact with any threads, as the pool is running in single-threaded test mode.
+ return _distributor->_stripe_accessor->rendezvous_and_hold_all();
+}
+
TopLevelBucketDBUpdater&
TopLevelDistributorTestUtil::bucket_db_updater() {
return *_distributor->_bucket_db_updater;
@@ -356,6 +375,11 @@ TopLevelDistributorTestUtil::reconfigure(const DistributorConfig& cfg)
tick(); // Config is propagated upon next top-level tick
}
+framework::MetricUpdateHook&
+TopLevelDistributorTestUtil::distributor_metric_update_hook() {
+ return _distributor->_metricUpdateHook;
+}
+
BucketDatabase&
TopLevelDistributorTestUtil::stripe_bucket_database(uint16_t stripe_idx) {
assert(stripe_idx < _distributor->_stripes.size());
@@ -430,4 +454,41 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di
_distributor->enableNextDistribution();
}
+const lib::ClusterStateBundle&
+TopLevelDistributorTestUtil::current_cluster_state_bundle() const
+{
+ // We assume that all stripes have the same cluster state internally, so just use the first.
+ assert(_distributor->_stripes[0]);
+ const auto& bundle = _distributor->_stripes[0]->getClusterStateBundle();
+ // ... but sanity-check just to make sure...
+ for (size_t i = 1; i < _num_distributor_stripes; ++i) {
+ assert(_distributor->_stripes[i]->getClusterStateBundle() == bundle);
+ }
+ return bundle;
+}
+
+void
+TopLevelDistributorTestUtil::tick_distributor_and_stripes_n_times(uint32_t n)
+{
+ for (uint32_t i = 0; i < n; ++i) {
+ tick(false);
+ }
+}
+
+void
+TopLevelDistributorTestUtil::tick_top_level_distributor_n_times(uint32_t n)
+{
+ for (uint32_t i = 0; i < n; ++i) {
+ tick(true);
+ }
+}
+
+void
+TopLevelDistributorTestUtil::complete_recovery_mode_on_all_stripes()
+{
+ for (auto* s : distributor_stripes()) {
+ s->scanAllBuckets();
+ }
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
index 9048160b652..8832f8ada6e 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.h
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -19,12 +19,14 @@ namespace distributor {
class TopLevelDistributor;
class DistributorBucketSpace;
+class DistributorBucketSpaceRepo;
class DistributorMetricSet;
class DistributorNodeContext;
class DistributorStripe;
class DistributorStripeComponent;
class DistributorStripeOperationContext;
class DistributorStripePool;
+class StripeAccessGuard;
class IdealStateMetricSet;
class Operation;
class TopLevelBucketDBUpdater;
@@ -58,6 +60,12 @@ public:
// As the above, but always inserts into default bucket space
void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr);
+ // TODO STRIPE replace with BucketSpaceStateMap once legacy is gone
+ DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept;
+ const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept;
+
+ std::unique_ptr<StripeAccessGuard> acquire_stripe_guard();
+
TopLevelBucketDBUpdater& bucket_db_updater();
const IdealStateMetricSet& total_ideal_state_metrics() const;
const DistributorMetricSet& total_distributor_metrics() const;
@@ -77,12 +85,19 @@ public:
return _node->getClock();
}
+ framework::MetricUpdateHook& distributor_metric_update_hook();
+
BucketDatabase& stripe_bucket_database(uint16_t stripe_idx); // Implicit default space only
BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space);
const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx) const; // Implicit default space only
const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const;
[[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const;
+ void tick_distributor_and_stripes_n_times(uint32_t n);
+ void tick_top_level_distributor_n_times(uint32_t n);
+
+ void complete_recovery_mode_on_all_stripes();
+
void setup_distributor(int redundancy,
int node_count,
const std::string& systemState,
@@ -122,6 +137,8 @@ public:
void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr);
+ const lib::ClusterStateBundle& current_cluster_state_bundle() const;
+
static std::vector<document::BucketSpace> bucket_spaces();
protected: