aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-09-13 13:20:52 +0200
committerGitHub <noreply@github.com>2021-09-13 13:20:52 +0200
commit1081d35dae697406a8d9e9ce98c220bc3f7ecd36 (patch)
tree9cfca4f69d38d469670ee95a928b74e5f9bb6b6d /storage
parent37dcc0dab402b6a12f59f2944bd95aafa7c11cb3 (diff)
parent29ec851c58741057ac78e44650d9cca6bc649b75 (diff)
Merge pull request #19076 from vespa-engine/vekterli/port-more-bucketdbupdater-tests
Port more BucketDBUpdater tests from legacy to new code path
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp30
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp1134
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test.cpp12
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp43
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h13
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h1
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.h5
8 files changed, 1201 insertions, 38 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
index 6cca6df9f80..fc2ad82f3a2 100644
--- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp
@@ -1212,6 +1212,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket
}
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply) {
enableDistributorClusterState("distributor:1 storage:3");
@@ -1254,6 +1255,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply) {
dumpBucket(document::BucketId(16, 1234)));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1296,6 +1298,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down) {
dumpBucket(document::BucketId(16, 1234)));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1338,7 +1341,7 @@ TEST_F(LegacyBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
dumpBucket(document::BucketId(16, 1234)));
};
-
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, flush) {
enableDistributorClusterState("distributor:1 storage:3");
_sender.clear();
@@ -1417,6 +1420,7 @@ LegacyBucketDBUpdaterTest::getSentNodesDistributionChanged(
return ost.str();
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) {
EXPECT_EQ(getNodeList({0, 1, 2}),
getSentNodes("cluster:d",
@@ -1514,6 +1518,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_send_messages) {
"distributor:3 storage:3 .1.s:m"));
};
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) {
DistributorMessageSenderStub sender;
@@ -1552,6 +1557,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_receive) {
EXPECT_EQ(3, (int)pendingTransition.results().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down true\n";
@@ -1571,6 +1577,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
"distributor:6 .2.s:d storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
@@ -1582,6 +1589,8 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_h
"distributor:6 .2.s:d .3.s:d storage:6"));
}
+namespace {
+
void
parseInputData(const std::string& data,
uint64_t timestamp,
@@ -1656,6 +1665,8 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
}
};
+}
+
std::string
LegacyBucketDBUpdaterTest::mergeBucketLists(
const lib::ClusterState& oldState,
@@ -1724,6 +1735,7 @@ LegacyBucketDBUpdaterTest::mergeBucketLists(const std::string& existingData,
includeBucketInfo);
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) {
// Simple initializing case - ask all nodes for info
EXPECT_EQ(
@@ -1763,6 +1775,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge) {
mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
// Node went from initializing to up and non-invalid bucket changed.
EXPECT_EQ(
@@ -1775,6 +1788,7 @@ TEST_F(LegacyBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
true));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1804,6 +1818,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_cur
EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1831,6 +1846,7 @@ TEST_F(LegacyBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pen
EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* If we get a distribution config change, it's important that cluster states that
* arrive after this--but _before_ the pending cluster state has finished--must trigger
@@ -1880,6 +1896,7 @@ TEST_F(LegacyBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_dis
EXPECT_EQ(size_t(0), _sender.commands().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
@@ -1929,6 +1946,7 @@ std::unique_ptr<BucketDatabase::EntryProcessor> func_processor(Func&& f) {
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) {
setDistribution(getDistConfig3Nodes1Group());
@@ -1948,6 +1966,7 @@ TEST_F(LegacyBucketDBUpdaterTest, changed_distribution_config_does_not_elide_buc
}));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) {
getClock().setAbsoluteTimeInSeconds(101234);
lib::ClusterState stateBefore("distributor:1 storage:1");
@@ -1963,6 +1982,7 @@ TEST_F(LegacyBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_ti
EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) {
{
lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i");
@@ -2051,6 +2071,7 @@ LegacyBucketDBUpdaterTest::getSentNodesWithPreemption(
using nodeVec = std::vector<uint16_t>;
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/*
* If we don't carry over the set of nodes that we need to fetch from,
* a naive comparison between the active state and the new state will
@@ -2067,6 +2088,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) {
EXPECT_EQ(
expandNodeVec({2, 3}),
@@ -2077,6 +2099,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) {
EXPECT_EQ(
expandNodeVec({2}),
@@ -2087,6 +2110,7 @@ TEST_F(LegacyBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) {
EXPECT_EQ(
nodeVec{},
@@ -2097,6 +2121,7 @@ TEST_F(LegacyBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_stat
"version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) {
// Even though 100 nodes are preempted, not all of these should be part
// of the request afterwards when only 6 are part of the state.
@@ -2109,6 +2134,7 @@ TEST_F(LegacyBucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) {
"version:3 distributor:6 storage:6"));
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) {
lib::ClusterState stateBefore(
"version:1 distributor:6 storage:6 .1.t:1234");
@@ -2123,6 +2149,7 @@ TEST_F(LegacyBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_sta
EXPECT_EQ(size_t(0), _sender.commands().size());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest (despite being disabled)
// XXX test currently disabled since distribution config currently isn't used
// at all in order to deduce the set of nodes to send to. This might not matter
// in practice since it is assumed that the cluster state matching the new
@@ -2144,6 +2171,7 @@ TEST_F(LegacyBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to
EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet());
}
+// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest
/**
* Test scenario where a cluster is downsized by removing a subset of the nodes
* from the distribution configuration. The system must be able to deal with
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 70e5afaed43..1a0ba8352b7 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -38,6 +38,8 @@ class TopLevelBucketDBUpdaterTest : public Test,
public TopLevelDistributorTestUtil
{
public:
+ using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
+
TopLevelBucketDBUpdaterTest();
~TopLevelBucketDBUpdaterTest() override;
@@ -117,6 +119,18 @@ public:
invalid_bucket_count));
}
+ void send_fake_reply_for_single_bucket_request(
+ const api::RequestBucketInfoCommand& rbi)
+ {
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ const document::BucketId& bucket(rbi.getBuckets()[0]);
+
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
+ reply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(20, 10, 12, 50, 60, true, true)));
+ stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply);
+ }
+
std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) {
BucketDatabase::Entry entry = get_bucket(id);
if (!entry.valid()) {
@@ -290,6 +304,87 @@ public:
ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets));
}
+ ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) {
+ lib::ClusterState baseline_cluster_state(clusterStateString);
+ lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state);
+ auto cluster_info = std::make_shared<SimpleClusterInformation>(
+ _distributor->node_identity().node_index(),
+ cluster_state_bundle,
+ "ui");
+ enable_distributor_cluster_state(clusterStateString);
+ return cluster_info;
+ }
+
+ struct PendingClusterStateFixture {
+ DistributorMessageSenderStub sender;
+ framework::defaultimplementation::FakeClock clock;
+ std::unique_ptr<PendingClusterState> state;
+
+ PendingClusterStateFixture(
+ TopLevelBucketDBUpdaterTest& owner,
+ const std::string& old_cluster_state,
+ const std::string& new_cluster_state)
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_cluster_state));
+ auto cluster_info = owner.create_cluster_info(old_cluster_state);
+ OutdatedNodesMap outdated_nodes_map;
+ state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender,
+ owner.top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, api::Timestamp(1));
+ }
+
+ PendingClusterStateFixture(
+ TopLevelBucketDBUpdaterTest& owner,
+ const std::string& old_cluster_state)
+ {
+ auto cluster_info = owner.create_cluster_info(old_cluster_state);
+ state = PendingClusterState::createForDistributionChange(
+ clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1));
+ }
+ };
+
+ std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_state_change(
+ const std::string& oldClusterState,
+ const std::string& newClusterState)
+ {
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState);
+ }
+
+ std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_distribution_change(
+ const std::string& oldClusterState)
+ {
+ return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
+ }
+
+ std::string get_sent_nodes(const std::string& old_cluster_state,
+ const std::string& new_cluster_state);
+
+ std::string get_sent_nodes_distribution_changed(const std::string& old_cluster_state);
+
+ std::string get_node_list(const std::vector<uint16_t>& nodes, size_t count);
+ std::string get_node_list(const std::vector<uint16_t>& nodes);
+
+ std::string merge_bucket_lists(const lib::ClusterState& old_state,
+ const std::string& existing_data,
+ const lib::ClusterState& new_state,
+ const std::string& new_data,
+ bool include_bucket_info = false);
+
+ std::string merge_bucket_lists(const std::string& existingData,
+ const std::string& newData,
+ bool includeBucketInfo = false);
+
+ std::vector<uint16_t> get_send_set() const;
+
+ std::vector<uint16_t> get_sent_nodes_with_preemption(
+ const std::string& old_cluster_state,
+ uint32_t expected_old_state_messages,
+ const std::string& preempted_cluster_state,
+ const std::string& new_cluster_state);
+
+ std::vector<uint16_t> expand_node_vec(const std::vector<uint16_t>& nodes);
+
};
TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest()
@@ -347,6 +442,21 @@ std::string dist_config_6_nodes_across_4_groups() {
"group[3].nodes[1].index 5\n");
}
+std::string dist_config_3_nodes_in_1_group() {
+ return ("redundancy 2\n"
+ "group[2]\n"
+ "group[0].name \"invalid\"\n"
+ "group[0].index \"invalid\"\n"
+ "group[0].partitions 1|*\n"
+ "group[0].nodes[0]\n"
+ "group[1].name rack0\n"
+ "group[1].index 0\n"
+ "group[1].nodes[3]\n"
+ "group[1].nodes[0].index 0\n"
+ "group[1].nodes[1].index 1\n"
+ "group[1].nodes[2].index 2\n");
+}
+
std::string
make_string_list(std::string s, uint32_t count)
{
@@ -368,6 +478,30 @@ make_request_bucket_info_strings(uint32_t count)
}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes, size_t count)
+{
+ std::ostringstream ost;
+ bool first = true;
+ for (const auto node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ if (!first) {
+ ost << ",";
+ }
+ ost << node;
+ first = false;
+ }
+ }
+ return ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes)
+{
+ return get_node_list(nodes, _bucket_spaces.size());
+}
+
TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) {
set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
@@ -525,13 +659,13 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) {
}
for (int i=0; i<10; i++) {
- EXPECT_EQ(std::string(""),
+ EXPECT_EQ("",
verifyBucket(document::BucketId(16, i),
lib::ClusterState("distributor:1 storage:1")));
}
// Set system state should now be passed on
- EXPECT_EQ(std::string("Set system state"), _sender_down.getCommands());
+ EXPECT_EQ("Set system state", _sender_down.getCommands());
}
TEST_F(TopLevelBucketDBUpdaterTest, down_while_init) {
@@ -588,9 +722,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_down_copies_get_in_sync) {
set_cluster_state("distributor:1 storage:3 .1.s:d");
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), "
- "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), "
+ "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)",
dump_bucket(bid));
}
@@ -651,11 +785,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) {
}
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(bucketlist[0]));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(bucketlist[1]));
{
@@ -688,17 +822,17 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) {
}
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000000) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000000) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 0)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 1)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 2)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000004) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000004) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 4)));
_sender.clear();
@@ -822,9 +956,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
}
// No database update until request bucket info replies have been received.
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
- "trusted=false,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
+ "trusted=false,active=false,ready=false)",
dump_bucket(document::BucketId(16, 1)));
EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2)));
@@ -845,11 +979,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) {
stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply);
}
- EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"),
+ EXPECT_EQ("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)",
dump_bucket(document::BucketId(16, 1)));
- EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"),
+ EXPECT_EQ("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)",
dump_bucket(document::BucketId(16, 2)));
}
@@ -947,4 +1081,956 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_buck
}
}
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ nodes.push_back(api::MergeBucketCommand::Node(0));
+ nodes.push_back(api::MergeBucketCommand::Node(1));
+ nodes.push_back(api::MergeBucketCommand::Node(2));
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+
+ for (uint32_t i = 0; i < 3; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
+ "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ std::vector<api::MergeBucketCommand::Node> nodes;
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ set_cluster_state(lib::ClusterState("distributor:1 storage:2"));
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(2), _sender.commands().size());
+
+ for (uint32_t i = 0; i < 2; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(
+ bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ std::vector<api::MergeBucketCommand::Node> nodes;
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+
+ set_cluster_state(lib::ClusterState("distributor:1 storage:2"));
+
+ for (uint32_t i = 0; i < 3; i++) {
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i));
+
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(bucket_id, req->getBuckets()[0]);
+
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
+ reqreply->getBucketInfo().push_back(
+ api::RequestBucketInfoReply::Entry(
+ bucket_id,
+ api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
+ stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply);
+ }
+
+ EXPECT_EQ("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)",
+ dump_bucket(bucket_id));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, flush) {
+ enable_distributor_cluster_state("distributor:1 storage:3");
+ _sender.clear();
+
+ document::BucketId bucket_id(16, 1234);
+ add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");
+
+ std::vector<api::MergeBucketCommand::Node> nodes;
+ for (uint32_t i = 0; i < 3; ++i) {
+ nodes.push_back(api::MergeBucketCommand::Node(i));
+ }
+
+ api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
+
+ _sender.clear();
+ stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply);
+
+ ASSERT_EQ(size_t(3), _sender.commands().size());
+ ASSERT_EQ(size_t(0), _sender_down.replies().size());
+
+ stripe_of_bucket(bucket_id).bucket_db_updater().flush();
+ // Flushing should drop all merge bucket replies
+ EXPECT_EQ(size_t(0), _sender_down.commands().size());
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_sent_nodes(const std::string& old_cluster_state,
+ const std::string& new_cluster_state)
+{
+ auto fixture = create_pending_state_fixture_for_state_change(old_cluster_state, new_cluster_state);
+ sort_sent_messages_by_index(fixture->sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < fixture->sender.commands().size(); i++) {
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.command(i));
+
+ if (i > 0) {
+ ost << ",";
+ }
+
+ ost << req.getAddress()->getIndex();
+ }
+
+ return ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::string& old_cluster_state)
+{
+ DistributorMessageSenderStub sender;
+
+ framework::defaultimplementation::FakeClock clock;
+ auto cluster_info = create_cluster_info(old_cluster_state);
+ std::unique_ptr<PendingClusterState> state(
+ PendingClusterState::createForDistributionChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1)));
+
+ sort_sent_messages_by_index(sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < sender.commands().size(); i++) {
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.command(i));
+
+ if (i > 0) {
+ ost << ",";
+ }
+
+ ost << req.getAddress()->getIndex();
+ }
+
+ return ost.str();
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_send_messages) {
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("cluster:d",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1}),
+ get_sent_nodes("cluster:d",
+ "distributor:1 storage:3 .2.s:m"));
+
+ EXPECT_EQ(get_node_list({2}),
+ get_sent_nodes("distributor:1 storage:2",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ(get_node_list({2, 3, 4, 5}),
+ get_sent_nodes("distributor:1 storage:2",
+ "distributor:1 storage:6"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2, 3}),
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:4 .2.s:d storage:4"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:4 storage:3",
+ "distributor:4 .0.s:d storage:4"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:4 storage:3"));
+
+ EXPECT_EQ(get_node_list({2}),
+ get_sent_nodes("distributor:3 storage:3 .2.s:i",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({1}),
+ get_sent_nodes("distributor:3 storage:3 .1.s:d",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({1, 2, 4}),
+ get_sent_nodes("distributor:3 storage:4 .1.s:d .2.s:i",
+ "distributor:3 storage:5"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "cluster:d"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:1 storage:3",
+ "cluster:d distributor:1 storage:6"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 .2.s:m storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes_distribution_changed("distributor:3 storage:3"));
+
+ EXPECT_EQ(get_node_list({0, 1}),
+ get_sent_nodes("distributor:10 storage:2",
+ "distributor:10 .1.s:d storage:2"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:2 storage:2",
+ "distributor:3 .2.s:i storage:2"));
+
+ EXPECT_EQ(get_node_list({0, 1, 2}),
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 .2.s:s storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 .2.s:s storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ(get_node_list({1}),
+ get_sent_nodes("distributor:3 storage:3 .1.s:m",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:3 storage:3",
+ "distributor:3 storage:3 .1.s:m"));
+};
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) {
+ DistributorMessageSenderStub sender;
+
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("distributor:1 storage:3"));
+
+ framework::defaultimplementation::FakeClock clock;
+ auto cluster_info = create_cluster_info("cluster:d");
+ OutdatedNodesMap outdated_nodes_map;
+ std::unique_ptr<PendingClusterState> state(
+ PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, api::Timestamp(1)));
+
+ ASSERT_EQ(message_count(3), sender.commands().size());
+
+ sort_sent_messages_by_index(sender);
+
+ std::ostringstream ost;
+ for (uint32_t i = 0; i < sender.commands().size(); i++) {
+ auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.command(i).get());
+ ASSERT_TRUE(req != nullptr);
+
+ auto rep = std::make_shared<RequestBucketInfoReply>(*req);
+
+ rep->getBucketInfo().push_back(
+ RequestBucketInfoReply::Entry(
+ document::BucketId(16, i),
+ api::BucketInfo(i, i, i, i, i)));
+
+ ASSERT_TRUE(state->onRequestBucketInfoReply(rep));
+ ASSERT_EQ((i == (sender.commands().size() - 1)), state->done());
+ }
+
+ auto& pending_transition = state->getPendingBucketSpaceDbTransition(makeBucketSpace());
+ EXPECT_EQ(3u, pending_transition.results().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) {
+ std::string config = dist_config_6_nodes_across_4_groups();
+ config += "distributor_auto_ownership_transfer_on_whole_group_down true\n";
+ set_distribution(config);
+
+ // Group config has nodes {0, 1}, {2, 3}, {4, 5}
+ // We're node index 0.
+
+ // Entire group 1 goes down. Must refetch from all nodes.
+ EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
+
+ // But don't fetch if not the entire group is down.
+ EXPECT_EQ("",
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) {
+ std::string config = dist_config_6_nodes_across_4_groups();
+ config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
+ set_distribution(config);
+
+ // Group is down, but config says to not do anything about it.
+ EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1),
+ get_sent_nodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
+}
+
+
+namespace {
+
+void
+parse_input_data(const std::string& data,
+ uint64_t timestamp,
+ PendingClusterState& state,
+ bool include_bucket_info)
+{
+ vespalib::StringTokenizer tokenizer(data, "|");
+ for (uint32_t i = 0; i < tokenizer.size(); i++) {
+ vespalib::StringTokenizer tok2(tokenizer[i], ":");
+
+ uint16_t node = atoi(tok2[0].data());
+
+ state.setNodeReplied(node);
+ auto& pending_transition = state.getPendingBucketSpaceDbTransition(makeBucketSpace());
+
+ vespalib::StringTokenizer tok3(tok2[1], ",");
+ for (uint32_t j = 0; j < tok3.size(); j++) {
+ if (include_bucket_info) {
+ vespalib::StringTokenizer tok4(tok3[j], "/");
+
+ pending_transition.addNodeInfo(
+ document::BucketId(16, atoi(tok4[0].data())),
+ BucketCopy(
+ timestamp,
+ node,
+ api::BucketInfo(
+ atoi(tok4[1].data()),
+ atoi(tok4[2].data()),
+ atoi(tok4[3].data()),
+ atoi(tok4[2].data()),
+ atoi(tok4[3].data()))));
+ } else {
+ pending_transition.addNodeInfo(
+ document::BucketId(16, atoi(tok3[j].data())),
+ BucketCopy(timestamp,
+ node,
+ api::BucketInfo(3, 3, 3, 3, 3)));
+ }
+ }
+ }
+}
+
+struct BucketDumper : public BucketDatabase::EntryProcessor
+{
+ std::ostringstream ost;
+ bool _include_bucket_info;
+
+ explicit BucketDumper(bool include_bucket_info)
+ : _include_bucket_info(include_bucket_info)
+ {
+ }
+
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
+ document::BucketId bucket_id(e.getBucketId());
+
+ ost << uint32_t(bucket_id.getRawId()) << ":";
+ for (uint32_t i = 0; i < e->getNodeCount(); ++i) {
+ if (i > 0) {
+ ost << ",";
+ }
+ const BucketCopy& copy(e->getNodeRef(i));
+ ost << copy.getNode();
+ if (_include_bucket_info) {
+ ost << "/" << copy.getChecksum()
+ << "/" << copy.getDocumentCount()
+ << "/" << copy.getTotalDocumentSize()
+ << "/" << (copy.trusted() ? "t" : "u");
+ }
+ }
+ ost << "|";
+ return true;
+ }
+};
+
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::merge_bucket_lists(
+ const lib::ClusterState& old_state,
+ const std::string& existing_data,
+ const lib::ClusterState& new_state,
+ const std::string& new_data,
+ bool include_bucket_info)
+{
+ framework::defaultimplementation::FakeClock clock;
+ framework::MilliSecTimer timer(clock);
+
+ DistributorMessageSenderStub sender;
+ OutdatedNodesMap outdated_nodes_map;
+
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(old_state);
+ api::Timestamp before_time(1);
+ auto cluster_info = create_cluster_info("cluster:d");
+
+ auto state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, before_time);
+
+ parse_input_data(existing_data, before_time, *state, include_bucket_info);
+ auto guard = acquire_stripe_guard();
+ state->merge_into_bucket_databases(*guard);
+ }
+
+ BucketDumper dumper_tmp(true);
+ for (auto* s : distributor_stripes()) {
+ auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.forEach(dumper_tmp);
+ }
+
+ {
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_state));
+ api::Timestamp after_time(2);
+ auto cluster_info = create_cluster_info(old_state.toString());
+
+ auto state = PendingClusterState::createForClusterStateChange(
+ clock, cluster_info, sender, top_level_bucket_space_repo(),
+ cmd, outdated_nodes_map, after_time);
+
+ parse_input_data(new_data, after_time, *state, include_bucket_info);
+ auto guard = acquire_stripe_guard();
+ state->merge_into_bucket_databases(*guard);
+ }
+
+ BucketDumper dumper(include_bucket_info);
+ for (auto* s : distributor_stripes()) {
+ auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.forEach(dumper);
+ db.clear();
+ }
+ return dumper.ost.str();
+}
+
+std::string
+TopLevelBucketDBUpdaterTest::merge_bucket_lists(const std::string& existing_data,
+ const std::string& new_data,
+ bool include_bucket_info)
+{
+ return merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:3"),
+ existing_data,
+ lib::ClusterState("distributor:1 storage:3"),
+ new_data,
+ include_bucket_info);
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge) {
+ // Result is on the form: [bucket w/o count bits]:[node indexes]|..
+ // Input is on the form: [node]:[bucket w/o count bits]|...
+
+ // Simple initializing case - ask all nodes for info
+ EXPECT_EQ("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|",
+ merge_bucket_lists(
+ "",
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6"));
+
+ // New node came up
+ EXPECT_EQ("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|",
+ merge_bucket_lists(
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
+ "3:1,3,5,6"));
+
+ // Node came up with some buckets removed and some added
+ // Buckets that were removed should not be removed as the node
+ // didn't lose a disk.
+ EXPECT_EQ("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|",
+ merge_bucket_lists(
+ "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
+ "0:1,2,6,8"));
+
+ // Bucket info format is "bucketid/checksum/count/size"
+ // Node went from initializing to up and invalid bucket went to empty.
+ EXPECT_EQ("2:0/0/0/0/t|",
+ merge_bucket_lists(
+ "0:2/0/0/1",
+ "0:2/0/0/0",
+ true));
+
+ EXPECT_EQ("5:1/2/3/4/u,0/0/0/0/u|",
+ merge_bucket_lists("", "0:5/0/0/0|1:5/2/3/4", true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) {
+ // Node went from initializing to up and non-invalid bucket changed.
+ EXPECT_EQ("2:0/2/3/4/t|3:0/2/4/6/t|",
+ merge_bucket_lists(
+ lib::ClusterState("distributor:1 storage:1 .0.s:i"),
+ "0:2/1/2/3,3/2/4/6",
+ lib::ClusterState("distributor:1 storage:1"),
+ "0:2/2/3/4,3/2/4/6",
+ true));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) {
+ document::BucketId bucket(16, 3);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+
+ stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket));
+
+ ASSERT_EQ(size_t(1), _sender.commands().size());
+ auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0));
+
+ lib::ClusterState state_after("distributor:3 storage:3");
+
+ {
+ uint32_t expected_msgs = message_count(2), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_after, expected_msgs, dummy_buckets_to_return));
+ }
+ EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state());
+
+ ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi));
+
+ EXPECT_EQ("NONEXISTING", dump_bucket(bucket));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) {
+ document::BucketId bucket(16, 3);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+
+ stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket));
+
+ ASSERT_EQ(size_t(1), _sender.commands().size());
+ auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0));
+
+ lib::ClusterState state_after("distributor:3 storage:3");
+ // Set, but _don't_ enable cluster state. We want it to be pending.
+ set_cluster_state(state_after);
+ EXPECT_TRUE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state());
+ EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_pending_state());
+
+ ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi));
+
+ EXPECT_EQ("NONEXISTING", dump_bucket(bucket));
+}
+
+/*
+ * If we get a distribution config change, it's important that cluster states that
+ * arrive after this--but _before_ the pending cluster state has finished--must trigger
+ * a full bucket info fetch no matter what the cluster state change was! Otherwise, we
+ * will with a high likelihood end up not getting the complete view of the buckets in
+ * the cluster.
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribution_change_pending) {
+ lib::ClusterState state_before("distributor:6 storage:6");
+ {
+ uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+ std::string distConfig(dist_config_6_nodes_across_2_groups());
+ set_distribution(distConfig);
+
+ sort_sent_messages_by_index(_sender);
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+ // Suddenly, a wild cluster state change appears! Even though this state
+ // does not in itself imply any bucket changes, it will still overwrite the
+ // pending cluster state and thus its state of pending bucket info requests.
+ set_cluster_state("distributor:6 .2.t:12345 storage:6");
+
+ ASSERT_EQ(message_count(12), _sender.commands().size());
+
+ // Send replies for first messageCount(6) (outdated requests).
+ int num_buckets = 10;
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.command(i), num_buckets));
+ }
+ // No change from these.
+ ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(1, "distributor:6 storage:6"));
+
+ // Send for current pending.
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
+ *_sender.command(i + message_count(6)),
+ num_buckets));
+ }
+ ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:6 storage:6"));
+ _sender.clear();
+
+ // No more pending global fetch; this should be a no-op state.
+ set_cluster_state("distributor:6 .3.t:12345 storage:6");
+ EXPECT_EQ(size_t(0), _sender.commands().size());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) {
+ uint32_t num_buckets = 20;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"),
+ message_count(6), num_buckets));
+ _sender.clear();
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ complete_recovery_mode_on_all_stripes();
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+
+ set_distribution(dist_config_6_nodes_across_4_groups());
+ sort_sent_messages_by_index(_sender);
+ // No replies received yet, still no recovery mode.
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+
+ ASSERT_EQ(message_count(6), _sender.commands().size());
+ num_buckets = 10;
+ for (uint32_t i = 0; i < message_count(6); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.command(i), num_buckets));
+ }
+
+ // Pending cluster state (i.e. distribution) has been enabled, which should
+ // cause recovery mode to be entered.
+ EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode());
+ complete_recovery_mode_on_all_stripes();
+ EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) {
+ set_distribution(dist_config_3_nodes_in_1_group());
+
+ constexpr uint32_t n_buckets = 100;
+ ASSERT_NO_FATAL_FAILURE(
+ set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), message_count(6), n_buckets));
+ _sender.clear();
+
+ // Config implies a different node set than the current cluster state, so it's crucial that
+ // DB pruning is _not_ elided. Yes, this is inherently racing with cluster state changes and
+ // should be changed to be atomic and controlled by the cluster controller instead of config.
+ // But this is where we currently are.
+ set_distribution(dist_config_6_nodes_across_2_groups());
+ for (auto* s : distributor_stripes()) {
+ const auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase();
+ db.acquire_read_guard()->for_each([&]([[maybe_unused]] uint64_t key, const auto& e) {
+ auto id = e.getBucketId();
+ EXPECT_TRUE(distributor_bucket_space(id).get_bucket_ownership_flags(id).owned_in_pending_state());
+ });
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) {
+ fake_clock().setAbsoluteTimeInSeconds(101234);
+ lib::ClusterState state_before("distributor:1 storage:1");
+ {
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ // setAndEnableClusterState adds n buckets with id (16, i)
+ document::BucketId bucket(16, 0);
+ BucketDatabase::Entry e = get_bucket(bucket);
+ ASSERT_TRUE(e.valid());
+ EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime());
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) {
+ {
+ lib::ClusterState state_before("distributor:1 storage:1 .0.s:i");
+ uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 0;
+ // This step is required to make the distributor ready for accepting
+ // the below explicit database insertion towards node 0.
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ }
+ _sender.clear();
+ fake_clock().setAbsoluteTimeInSeconds(1000);
+ lib::ClusterState state("distributor:1 storage:1");
+ set_cluster_state(state);
+ ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size());
+
+ // Before replying with the bucket info, simulate the arrival of a mutation
+ // reply that alters the state of the bucket with information that will be
+ // more recent that what is returned by the bucket info. This information
+ // must not be lost when the bucket info is later merged into the database.
+ document::BucketId bucket(16, 1);
+ constexpr uint64_t insertion_timestamp = 1001ULL * 1000000;
+ api::BucketInfo wanted_info(5, 6, 7);
+ stripe_of_bucket(bucket).bucket_db_updater().operation_context().update_bucket_database(
+ makeDocumentBucket(bucket),
+ BucketCopy(insertion_timestamp, 0, wanted_info),
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
+
+ fake_clock().setAbsoluteTimeInSeconds(1002);
+ constexpr uint32_t buckets_returned = 10; // Buckets (16, 0) ... (16, 9)
+ // Return bucket information which on the timeline might originate from
+ // anywhere between [1000, 1002]. Our assumption is that any mutations
+ // taking place after t=1000 must have its reply received and processed
+ // by this distributor and timestamped strictly higher than t=1000 (modulo
+ // clock skew, of course, but that is outside the scope of this). A mutation
+ // happening before t=1000 but receiving a reply at t>1000 does not affect
+ // correctness, as this should contain the same bucket info as that
+ // contained in the full bucket reply and the DB update is thus idempotent.
+ for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), buckets_returned));
+ }
+
+ BucketDatabase::Entry e = get_bucket(bucket);
+ ASSERT_EQ(uint32_t(1), e->getNodeCount());
+ EXPECT_EQ(wanted_info, e->getNodeRef(0).getBucketInfo());
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::get_send_set() const
+{
+ std::vector<uint16_t> nodes;
+ std::transform(_sender.commands().begin(),
+ _sender.commands().end(),
+ std::back_inserter(nodes),
+ [](auto& cmd)
+ {
+ auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd));
+ return req.getAddress()->getIndex();
+ });
+ return nodes;
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::get_sent_nodes_with_preemption(
+ const std::string& old_cluster_state,
+ uint32_t expected_old_state_messages,
+ const std::string& preempted_cluster_state,
+ const std::string& new_cluster_state)
+{
+ uint32_t dummy_buckets_to_return = 10;
+ // FIXME cannot chain assertion checks in non-void function
+ set_and_enable_cluster_state(lib::ClusterState(old_cluster_state),
+ expected_old_state_messages,
+ dummy_buckets_to_return);
+
+ _sender.clear();
+
+ set_cluster_state(preempted_cluster_state);
+ _sender.clear();
+ // Do not allow the pending state to become the active state; trigger a
+ // new transition without ACKing the info requests first. This will
+ // overwrite the pending state entirely.
+ set_cluster_state(lib::ClusterState(new_cluster_state));
+ return get_send_set();
+}
+
+std::vector<uint16_t>
+TopLevelBucketDBUpdaterTest::expand_node_vec(const std::vector<uint16_t>& nodes)
+{
+ std::vector<uint16_t> res;
+ size_t count = _bucket_spaces.size();
+ for (const auto &node : nodes) {
+ for (uint32_t i = 0; i < count; ++i) {
+ res.push_back(node);
+ }
+ }
+ return res;
+}
+
+/*
+ * If we don't carry over the set of nodes that we need to fetch from,
+ * a naive comparison between the active state and the new state will
+ * make it appear to the distributor that nothing has changed, as any
+ * database modifications caused by intermediate states will not be
+ * accounted for (basically the ABA problem in a distributed setting).
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_over_to_next_state_fetch) {
+ EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes_with_preemption("version:1 distributor:6 storage:6",
+ message_count(6),
+ "version:2 distributor:6 .5.s:d storage:6",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) {
+ EXPECT_EQ(expand_node_vec({2, 3}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6 .2.s:d",
+ message_count(5),
+ "version:2 distributor:6 storage:6 .2.s:d .3.s:d",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) {
+ EXPECT_EQ(expand_node_vec({2}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6",
+ message_count(6),
+ "version:2 distributor:6 storage:6 .2.s:d",
+ "version:3 distributor:6 storage:6"));
+}
+
+using NodeVec = std::vector<uint16_t>;
+
+TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) {
+ EXPECT_EQ(NodeVec{},
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:6 .2.s:d",
+ message_count(5),
+ "version:2 distributor:6 storage:6", // Sends to 2.
+ "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_not_part_of_new_state) {
+ // Even though 100 nodes are preempted, not all of these should be part
+ // of the request afterwards when only 6 are part of the state.
+ EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}),
+ get_sent_nodes_with_preemption(
+ "version:1 distributor:6 storage:100",
+ message_count(100),
+ "version:2 distributor:5 .4.s:d storage:100",
+ "version:3 distributor:6 storage:6"));
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) {
+ lib::ClusterState state_before("version:1 distributor:6 storage:6 .1.t:1234");
+ uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 10;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+ // New cluster state that should not by itself trigger any new fetches,
+ // unless outdated node set is somehow not cleared after an enabled
+ // (completed) cluster state has been set.
+ set_cluster_state("version:3 distributor:6 storage:6");
+ EXPECT_EQ(size_t(0), _sender.commands().size());
+}
+
+// XXX test currently disabled since distribution config currently isn't used
+// at all in order to deduce the set of nodes to send to. This might not matter
+// in practice since it is assumed that the cluster state matching the new
+// distribution config will follow very shortly after the config has been
+// applied to the node. The new cluster state will then send out requests to
+// the correct node set.
+TEST_F(TopLevelBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to_available_nodes) {
+ uint32_t expected_msgs = 6, dummy_buckets_to_return = 20;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"),
+ expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+
+ // Intentionally trigger a racing config change which arrives before the
+ // new cluster state representing it.
+ set_distribution(dist_config_3_nodes_in_1_group());
+ sort_sent_messages_by_index(_sender);
+
+ EXPECT_EQ((NodeVec{0, 1, 2}), get_send_set());
+}
+
+/**
+ * Test scenario where a cluster is downsized by removing a subset of the nodes
+ * from the distribution configuration. The system must be able to deal with
+ * a scenario where the set of nodes between two cluster states across a config
+ * change may differ.
+ *
+ * See VESPA-790 for details.
+ */
+TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing_ownership_transfer) {
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:3 storage:3"),
+ expected_msgs, dummy_buckets_to_return));
+ _sender.clear();
+
+ // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config
+ // that does not contain node 2 while the _active_ cluster state still
+ // contains this node.
+ const char* downsize_cfg =
+ "redundancy 2\n"
+ "distributor_auto_ownership_transfer_on_whole_group_down true\n"
+ "group[2]\n"
+ "group[0].name \"invalid\"\n"
+ "group[0].index \"invalid\"\n"
+ "group[0].partitions 1|*\n"
+ "group[0].nodes[0]\n"
+ "group[1].name rack0\n"
+ "group[1].index 0\n"
+ "group[1].nodes[2]\n"
+ "group[1].nodes[0].index 0\n"
+ "group[1].nodes[1].index 1\n";
+
+ set_distribution(downsize_cfg);
+ sort_sent_messages_by_index(_sender);
+ _sender.clear();
+
+ // Attempt to apply state with {0, 1} set. This will compare the new state
+ // with the previous state, which still has node 2.
+ expected_msgs = message_count(2);
+ ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:2 storage:2"),
+ expected_msgs, dummy_buckets_to_return));
+
+ EXPECT_EQ(expand_node_vec({0, 1}), get_send_set());
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp
index d8df36e53b2..28831c8a661 100644
--- a/storage/src/tests/distributor/top_level_distributor_test.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test.cpp
@@ -70,18 +70,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil {
return posted_msgs.str();
}
- void tick_distributor_and_stripes_n_times(uint32_t n) {
- for (uint32_t i = 0; i < n; ++i) {
- tick(false);
- }
- }
-
- void tick_top_level_distributor_n_times(uint32_t n) {
- for (uint32_t i = 0; i < n; ++i) {
- tick(true);
- }
- }
-
StatusReporterDelegate& distributor_status_delegate() {
return _distributor->_distributorStatusDelegate;
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index c4173f5e8ff..0db345636ee 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -265,6 +265,25 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const
return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId);
}
+DistributorBucketSpaceRepo&
+TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept
+{
+ return _distributor->_component.bucket_space_repo();
+}
+
+const DistributorBucketSpaceRepo&
+TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept
+{
+ return _distributor->_component.bucket_space_repo();
+}
+
+std::unique_ptr<StripeAccessGuard>
+TopLevelDistributorTestUtil::acquire_stripe_guard()
+{
+ // Note: this won't actually interact with any threads, as the pool is running in single-threaded test mode.
+ return _distributor->_stripe_accessor->rendezvous_and_hold_all();
+}
+
TopLevelBucketDBUpdater&
TopLevelDistributorTestUtil::bucket_db_updater() {
return *_distributor->_bucket_db_updater;
@@ -430,4 +449,28 @@ TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Di
_distributor->enableNextDistribution();
}
+void
+TopLevelDistributorTestUtil::tick_distributor_and_stripes_n_times(uint32_t n)
+{
+ for (uint32_t i = 0; i < n; ++i) {
+ tick(false);
+ }
+}
+
+void
+TopLevelDistributorTestUtil::tick_top_level_distributor_n_times(uint32_t n)
+{
+ for (uint32_t i = 0; i < n; ++i) {
+ tick(true);
+ }
+}
+
+void
+TopLevelDistributorTestUtil::complete_recovery_mode_on_all_stripes()
+{
+ for (auto* s : distributor_stripes()) {
+ s->scanAllBuckets();
+ }
+}
+
}
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
index 9048160b652..1d9d1613920 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.h
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -19,12 +19,14 @@ namespace distributor {
class TopLevelDistributor;
class DistributorBucketSpace;
+class DistributorBucketSpaceRepo;
class DistributorMetricSet;
class DistributorNodeContext;
class DistributorStripe;
class DistributorStripeComponent;
class DistributorStripeOperationContext;
class DistributorStripePool;
+class StripeAccessGuard;
class IdealStateMetricSet;
class Operation;
class TopLevelBucketDBUpdater;
@@ -58,6 +60,12 @@ public:
// As the above, but always inserts into default bucket space
void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr);
+ // TODO STRIPE replace with BucketSpaceStateMap once legacy is gone
+ DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept;
+ const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept;
+
+ std::unique_ptr<StripeAccessGuard> acquire_stripe_guard();
+
TopLevelBucketDBUpdater& bucket_db_updater();
const IdealStateMetricSet& total_ideal_state_metrics() const;
const DistributorMetricSet& total_distributor_metrics() const;
@@ -83,6 +91,11 @@ public:
const BucketDatabase& stripe_bucket_database(uint16_t stripe_idx, document::BucketSpace space) const;
[[nodiscard]] bool all_distributor_stripes_are_in_recovery_mode() const;
+ void tick_distributor_and_stripes_n_times(uint32_t n);
+ void tick_top_level_distributor_n_times(uint32_t n);
+
+ void complete_recovery_mode_on_all_stripes();
+
void setup_distributor(int redundancy,
int node_count,
const std::string& systemState,
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 5426d311558..b1b20cf445a 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -198,6 +198,7 @@ private:
friend class TopLevelDistributor;
friend class DistributorStripeTestUtil;
friend class DistributorTestUtil;
+ friend class TopLevelDistributorTestUtil;
friend class LegacyBucketDBUpdaterTest;
friend class MetricUpdateHook;
friend class MultiThreadedStripeAccessGuard;
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index 68a52e27d84..bbef20b2a23 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -58,6 +58,7 @@ TopLevelDistributor::TopLevelDistributor(DistributorComponentRegister& compReg,
ChainedMessageSender* messageSender)
: StorageLink("distributor"),
framework::StatusReporter("distributor", "Distributor"),
+ _node_identity(node_identity),
_comp_reg(compReg),
_use_legacy_mode(num_distributor_stripes == 0),
_metrics(std::make_shared<DistributorMetricSet>()),
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.h b/storage/src/vespa/storage/distributor/top_level_distributor.h
index 57ff5268323..5de4b9c1aaa 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.h
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.h
@@ -20,6 +20,7 @@
#include <vespa/storage/common/distributorcomponent.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
+#include <vespa/storage/common/node_identity.h>
#include <vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h>
#include <vespa/storage/distributor/maintenance/maintenancescheduler.h>
#include <vespa/storageapi/message/state.h>
@@ -33,7 +34,6 @@
namespace storage {
struct DoneInitializeHandler;
class HostInfo;
- class NodeIdentity;
}
namespace storage::distributor {
@@ -84,6 +84,8 @@ public:
DistributorMetricSet& getMetrics();
+ const NodeIdentity& node_identity() const noexcept { return _node_identity; }
+
// Implements DistributorInterface and DistributorMessageSender.
DistributorMetricSet& metrics() override { return getMetrics(); }
const DistributorConfiguration& config() const override;
@@ -205,6 +207,7 @@ private:
using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
+ const NodeIdentity _node_identity;
DistributorComponentRegister& _comp_reg;
const bool _use_legacy_mode;
std::shared_ptr<DistributorMetricSet> _metrics;