diff options
Diffstat (limited to 'storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp')
-rw-r--r-- | storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp | 1134 |
1 files changed, 1110 insertions, 24 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index 70e5afaed43..1a0ba8352b7 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -38,6 +38,8 @@ class TopLevelBucketDBUpdaterTest : public Test, public TopLevelDistributorTestUtil { public: + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; + TopLevelBucketDBUpdaterTest(); ~TopLevelBucketDBUpdaterTest() override; @@ -117,6 +119,18 @@ public: invalid_bucket_count)); } + void send_fake_reply_for_single_bucket_request( + const api::RequestBucketInfoCommand& rbi) + { + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + const document::BucketId& bucket(rbi.getBuckets()[0]); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(20, 10, 12, 50, 60, true, true))); + stripe_of_bucket(bucket).bucket_db_updater().onRequestBucketInfoReply(reply); + } + std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { BucketDatabase::Entry entry = get_bucket(id); if (!entry.valid()) { @@ -290,6 +304,87 @@ public: ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets)); } + ClusterInformation::CSP create_cluster_info(const std::string& clusterStateString) { + lib::ClusterState baseline_cluster_state(clusterStateString); + lib::ClusterStateBundle cluster_state_bundle(baseline_cluster_state); + auto cluster_info = std::make_shared<SimpleClusterInformation>( + _distributor->node_identity().node_index(), + cluster_state_bundle, + "ui"); + enable_distributor_cluster_state(clusterStateString); + return cluster_info; + } + + struct PendingClusterStateFixture { + DistributorMessageSenderStub sender; + framework::defaultimplementation::FakeClock clock; + std::unique_ptr<PendingClusterState> state; + + PendingClusterStateFixture( + TopLevelBucketDBUpdaterTest& owner, + const std::string& old_cluster_state, + const std::string& new_cluster_state) + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_cluster_state)); + auto cluster_info = owner.create_cluster_info(old_cluster_state); + OutdatedNodesMap outdated_nodes_map; + state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, + owner.top_level_bucket_space_repo(), + cmd, outdated_nodes_map, api::Timestamp(1)); + } + + PendingClusterStateFixture( + TopLevelBucketDBUpdaterTest& owner, + const std::string& old_cluster_state) + { + auto cluster_info = owner.create_cluster_info(old_cluster_state); + state = PendingClusterState::createForDistributionChange( + clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1)); + } + }; + + std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_state_change( + const std::string& oldClusterState, + const std::string& newClusterState) + { + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState); + } + + std::unique_ptr<PendingClusterStateFixture> create_pending_state_fixture_for_distribution_change( + const std::string& oldClusterState) + { + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); + } + + std::string get_sent_nodes(const std::string& old_cluster_state, + const std::string& new_cluster_state); + + std::string get_sent_nodes_distribution_changed(const std::string& old_cluster_state); + + std::string get_node_list(const std::vector<uint16_t>& nodes, size_t count); + std::string get_node_list(const std::vector<uint16_t>& nodes); + + std::string merge_bucket_lists(const lib::ClusterState& old_state, + const std::string& existing_data, + const lib::ClusterState& new_state, + const std::string& new_data, + bool include_bucket_info = false); + + std::string merge_bucket_lists(const std::string& existingData, + const std::string& newData, + bool includeBucketInfo = false); + + std::vector<uint16_t> get_send_set() const; + + std::vector<uint16_t> get_sent_nodes_with_preemption( + const std::string& old_cluster_state, + uint32_t expected_old_state_messages, + const std::string& preempted_cluster_state, + const std::string& new_cluster_state); + + std::vector<uint16_t> expand_node_vec(const std::vector<uint16_t>& nodes); + }; TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest() @@ -347,6 +442,21 @@ std::string dist_config_6_nodes_across_4_groups() { "group[3].nodes[1].index 5\n"); } +std::string dist_config_3_nodes_in_1_group() { + return ("redundancy 2\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[3]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[1].nodes[2].index 2\n"); +} + std::string make_string_list(std::string s, uint32_t count) { @@ -368,6 +478,30 @@ make_request_bucket_info_strings(uint32_t count) } + +std::string +TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes, size_t count) +{ + std::ostringstream ost; + bool first = true; + for (const auto node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + if (!first) { + ost << ","; + } + ost << node; + first = false; + } + } + return ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::get_node_list(const std::vector<uint16_t>& nodes) +{ + return get_node_list(nodes, _bucket_spaces.size()); +} + TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); @@ -525,13 +659,13 @@ TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { } for (int i=0; i<10; i++) { - EXPECT_EQ(std::string(""), + EXPECT_EQ("", verifyBucket(document::BucketId(16, i), lib::ClusterState("distributor:1 storage:1"))); } // Set system state should now be passed on - EXPECT_EQ(std::string("Set system state"), _sender_down.getCommands()); + EXPECT_EQ("Set system state", _sender_down.getCommands()); } TEST_F(TopLevelBucketDBUpdaterTest, down_while_init) { @@ -588,9 +722,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_down_copies_get_in_sync) { set_cluster_state("distributor:1 storage:3 .1.s:d"); - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " - "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " + "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)", dump_bucket(bid)); } @@ -651,11 +785,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) { } } - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(bucketlist[0])); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(bucketlist[1])); { @@ -688,17 +822,17 @@ TEST_F(TopLevelBucketDBUpdaterTest, bit_change) { } } - EXPECT_EQ(std::string("BucketId(0x4000000000000000) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000000) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 0))); - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 2))); - EXPECT_EQ(std::string("BucketId(0x4000000000000004) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000004) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 4))); _sender.clear(); @@ -822,9 +956,9 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { } // No database update until request bucket info replies have been received. - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," - "trusted=false,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," + "trusted=false,active=false,ready=false)", dump_bucket(document::BucketId(16, 1))); EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2))); @@ -845,11 +979,11 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply); } - EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)", dump_bucket(document::BucketId(16, 1))); - EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), + EXPECT_EQ("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)", dump_bucket(document::BucketId(16, 2))); } @@ -947,4 +1081,956 @@ TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_buck } } +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply) { + enable_distributor_cluster_state("distributor:1 storage:3"); + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + nodes.push_back(api::MergeBucketCommand::Node(0)); + nodes.push_back(api::MergeBucketCommand::Node(1)); + nodes.push_back(api::MergeBucketCommand::Node(2)); + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + + for (uint32_t i = 0; i < 3; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " + "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down) { + enable_distributor_cluster_state("distributor:1 storage:3"); + std::vector<api::MergeBucketCommand::Node> nodes; + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + set_cluster_state(lib::ClusterState("distributor:1 storage:2")); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(2), _sender.commands().size()); + + for (uint32_t i = 0; i < 2; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { + enable_distributor_cluster_state("distributor:1 storage:3"); + std::vector<api::MergeBucketCommand::Node> nodes; + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + + set_cluster_state(lib::ClusterState("distributor:1 storage:2")); + + for (uint32_t i = 0; i < 3; i++) { + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.command(i)); + + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(bucket_id, req->getBuckets()[0]); + + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); + reqreply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + bucket_id, + api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reqreply); + } + + EXPECT_EQ("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, flush) { + enable_distributor_cluster_state("distributor:1 storage:3"); + _sender.clear(); + + document::BucketId bucket_id(16, 1234); + add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); + + std::vector<api::MergeBucketCommand::Node> nodes; + for (uint32_t i = 0; i < 3; ++i) { + nodes.push_back(api::MergeBucketCommand::Node(i)); + } + + api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); + + _sender.clear(); + stripe_of_bucket(bucket_id).bucket_db_updater().onMergeBucketReply(reply); + + ASSERT_EQ(size_t(3), _sender.commands().size()); + ASSERT_EQ(size_t(0), _sender_down.replies().size()); + + stripe_of_bucket(bucket_id).bucket_db_updater().flush(); + // Flushing should drop all merge bucket replies + EXPECT_EQ(size_t(0), _sender_down.commands().size()); +} + +std::string +TopLevelBucketDBUpdaterTest::get_sent_nodes(const std::string& old_cluster_state, + const std::string& new_cluster_state) +{ + auto fixture = create_pending_state_fixture_for_state_change(old_cluster_state, new_cluster_state); + sort_sent_messages_by_index(fixture->sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < fixture->sender.commands().size(); i++) { + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.command(i)); + + if (i > 0) { + ost << ","; + } + + ost << req.getAddress()->getIndex(); + } + + return ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::string& old_cluster_state) +{ + DistributorMessageSenderStub sender; + + framework::defaultimplementation::FakeClock clock; + auto cluster_info = create_cluster_info(old_cluster_state); + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForDistributionChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1))); + + sort_sent_messages_by_index(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands().size(); i++) { + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.command(i)); + + if (i > 0) { + ost << ","; + } + + ost << req.getAddress()->getIndex(); + } + + return ost.str(); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_send_messages) { + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("cluster:d", + "distributor:1 storage:3")); + + EXPECT_EQ(get_node_list({0, 1}), + get_sent_nodes("cluster:d", + "distributor:1 storage:3 .2.s:m")); + + EXPECT_EQ(get_node_list({2}), + get_sent_nodes("distributor:1 storage:2", + "distributor:1 storage:3")); + + EXPECT_EQ(get_node_list({2, 3, 4, 5}), + get_sent_nodes("distributor:1 storage:2", + "distributor:1 storage:6")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:4 storage:3", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2, 3}), + get_sent_nodes("distributor:4 storage:3", + "distributor:4 .2.s:d storage:4")); + + EXPECT_EQ("", + get_sent_nodes("distributor:4 storage:3", + "distributor:4 .0.s:d storage:4")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:4 storage:3")); + + EXPECT_EQ(get_node_list({2}), + get_sent_nodes("distributor:3 storage:3 .2.s:i", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({1}), + get_sent_nodes("distributor:3 storage:3 .1.s:d", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({1, 2, 4}), + get_sent_nodes("distributor:3 storage:4 .1.s:d .2.s:i", + "distributor:3 storage:5")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "cluster:d")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "distributor:1 storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:1 storage:3", + "cluster:d distributor:1 storage:6")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:3 .2.s:m storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:3 .2.s:m storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 .2.s:m storage:3", + "distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes_distribution_changed("distributor:3 storage:3")); + + EXPECT_EQ(get_node_list({0, 1}), + get_sent_nodes("distributor:10 storage:2", + "distributor:10 .1.s:d storage:2")); + + EXPECT_EQ("", + get_sent_nodes("distributor:2 storage:2", + "distributor:3 .2.s:i storage:2")); + + EXPECT_EQ(get_node_list({0, 1, 2}), + get_sent_nodes("distributor:3 storage:3", + "distributor:3 .2.s:s storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 .2.s:s storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ(get_node_list({1}), + get_sent_nodes("distributor:3 storage:3 .1.s:m", + "distributor:3 storage:3")); + + EXPECT_EQ("", + get_sent_nodes("distributor:3 storage:3", + "distributor:3 storage:3 .1.s:m")); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) { + DistributorMessageSenderStub sender; + + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState("distributor:1 storage:3")); + + framework::defaultimplementation::FakeClock clock; + auto cluster_info = create_cluster_info("cluster:d"); + OutdatedNodesMap outdated_nodes_map; + std::unique_ptr<PendingClusterState> state( + PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, api::Timestamp(1))); + + ASSERT_EQ(message_count(3), sender.commands().size()); + + sort_sent_messages_by_index(sender); + + std::ostringstream ost; + for (uint32_t i = 0; i < sender.commands().size(); i++) { + auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.command(i).get()); + ASSERT_TRUE(req != nullptr); + + auto rep = std::make_shared<RequestBucketInfoReply>(*req); + + rep->getBucketInfo().push_back( + RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(i, i, i, i, i))); + + ASSERT_TRUE(state->onRequestBucketInfoReply(rep)); + ASSERT_EQ((i == (sender.commands().size() - 1)), state->done()); + } + + auto& pending_transition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); + EXPECT_EQ(3u, pending_transition.results().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down) { + std::string config = dist_config_6_nodes_across_4_groups(); + config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; + set_distribution(config); + + // Group config has nodes {0, 1}, {2, 3}, {4, 5} + // We're node index 0. + + // Entire group 1 goes down. Must refetch from all nodes. + EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}), + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); + + // But don't fetch if not the entire group is down. + EXPECT_EQ("", + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_with_group_down_and_no_handover) { + std::string config = dist_config_6_nodes_across_4_groups(); + config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; + set_distribution(config); + + // Group is down, but config says to not do anything about it. + EXPECT_EQ(get_node_list({0, 1, 2, 3, 4, 5}, _bucket_spaces.size() - 1), + get_sent_nodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); +} + + +namespace { + +void +parse_input_data(const std::string& data, + uint64_t timestamp, + PendingClusterState& state, + bool include_bucket_info) +{ + vespalib::StringTokenizer tokenizer(data, "|"); + for (uint32_t i = 0; i < tokenizer.size(); i++) { + vespalib::StringTokenizer tok2(tokenizer[i], ":"); + + uint16_t node = atoi(tok2[0].data()); + + state.setNodeReplied(node); + auto& pending_transition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); + + vespalib::StringTokenizer tok3(tok2[1], ","); + for (uint32_t j = 0; j < tok3.size(); j++) { + if (include_bucket_info) { + vespalib::StringTokenizer tok4(tok3[j], "/"); + + pending_transition.addNodeInfo( + document::BucketId(16, atoi(tok4[0].data())), + BucketCopy( + timestamp, + node, + api::BucketInfo( + atoi(tok4[1].data()), + atoi(tok4[2].data()), + atoi(tok4[3].data()), + atoi(tok4[2].data()), + atoi(tok4[3].data())))); + } else { + pending_transition.addNodeInfo( + document::BucketId(16, atoi(tok3[j].data())), + BucketCopy(timestamp, + node, + api::BucketInfo(3, 3, 3, 3, 3))); + } + } + } +} + +struct BucketDumper : public BucketDatabase::EntryProcessor +{ + std::ostringstream ost; + bool _include_bucket_info; + + explicit BucketDumper(bool include_bucket_info) + : _include_bucket_info(include_bucket_info) + { + } + + bool process(const BucketDatabase::ConstEntryRef& e) override { + document::BucketId bucket_id(e.getBucketId()); + + ost << uint32_t(bucket_id.getRawId()) << ":"; + for (uint32_t i = 0; i < e->getNodeCount(); ++i) { + if (i > 0) { + ost << ","; + } + const BucketCopy& copy(e->getNodeRef(i)); + ost << copy.getNode(); + if (_include_bucket_info) { + ost << "/" << copy.getChecksum() + << "/" << copy.getDocumentCount() + << "/" << copy.getTotalDocumentSize() + << "/" << (copy.trusted() ? "t" : "u"); + } + } + ost << "|"; + return true; + } +}; + +} + +std::string +TopLevelBucketDBUpdaterTest::merge_bucket_lists( + const lib::ClusterState& old_state, + const std::string& existing_data, + const lib::ClusterState& new_state, + const std::string& new_data, + bool include_bucket_info) +{ + framework::defaultimplementation::FakeClock clock; + framework::MilliSecTimer timer(clock); + + DistributorMessageSenderStub sender; + OutdatedNodesMap outdated_nodes_map; + + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(old_state); + api::Timestamp before_time(1); + auto cluster_info = create_cluster_info("cluster:d"); + + auto state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, before_time); + + parse_input_data(existing_data, before_time, *state, include_bucket_info); + auto guard = acquire_stripe_guard(); + state->merge_into_bucket_databases(*guard); + } + + BucketDumper dumper_tmp(true); + for (auto* s : distributor_stripes()) { + auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.forEach(dumper_tmp); + } + + { + auto cmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(new_state)); + api::Timestamp after_time(2); + auto cluster_info = create_cluster_info(old_state.toString()); + + auto state = PendingClusterState::createForClusterStateChange( + clock, cluster_info, sender, top_level_bucket_space_repo(), + cmd, outdated_nodes_map, after_time); + + parse_input_data(new_data, after_time, *state, include_bucket_info); + auto guard = acquire_stripe_guard(); + state->merge_into_bucket_databases(*guard); + } + + BucketDumper dumper(include_bucket_info); + for (auto* s : distributor_stripes()) { + auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.forEach(dumper); + db.clear(); + } + return dumper.ost.str(); +} + +std::string +TopLevelBucketDBUpdaterTest::merge_bucket_lists(const std::string& existing_data, + const std::string& new_data, + bool include_bucket_info) +{ + return merge_bucket_lists( + lib::ClusterState("distributor:1 storage:3"), + existing_data, + lib::ClusterState("distributor:1 storage:3"), + new_data, + include_bucket_info); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge) { + // Result is on the form: [bucket w/o count bits]:[node indexes]|.. + // Input is on the form: [node]:[bucket w/o count bits]|... + + // Simple initializing case - ask all nodes for info + EXPECT_EQ("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|", + merge_bucket_lists( + "", + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6")); + + // New node came up + EXPECT_EQ("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|", + merge_bucket_lists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "3:1,3,5,6")); + + // Node came up with some buckets removed and some added + // Buckets that were removed should not be removed as the node + // didn't lose a disk. + EXPECT_EQ("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|", + merge_bucket_lists( + "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", + "0:1,2,6,8")); + + // Bucket info format is "bucketid/checksum/count/size" + // Node went from initializing to up and invalid bucket went to empty. + EXPECT_EQ("2:0/0/0/0/t|", + merge_bucket_lists( + "0:2/0/0/1", + "0:2/0/0/0", + true)); + + EXPECT_EQ("5:1/2/3/4/u,0/0/0/0/u|", + merge_bucket_lists("", "0:5/0/0/0|1:5/2/3/4", true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_merge_replica_changed) { + // Node went from initializing to up and non-invalid bucket changed. + EXPECT_EQ("2:0/2/3/4/t|3:0/2/4/6/t|", + merge_bucket_lists( + lib::ClusterState("distributor:1 storage:1 .0.s:i"), + "0:2/1/2/3,3/2/4/6", + lib::ClusterState("distributor:1 storage:1"), + "0:2/2/3/4,3/2/4/6", + true)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_current_state) { + document::BucketId bucket(16, 3); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + + stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket)); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0)); + + lib::ClusterState state_after("distributor:3 storage:3"); + + { + uint32_t expected_msgs = message_count(2), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_after, expected_msgs, dummy_buckets_to_return)); + } + EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state()); + + ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi)); + + EXPECT_EQ("NONEXISTING", dump_bucket(bucket)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, no_db_resurrection_for_bucket_not_owned_in_pending_state) { + document::BucketId bucket(16, 3); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + + stripe_of_bucket(bucket).bucket_db_updater().recheckBucketInfo(0, makeDocumentBucket(bucket)); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + auto rbi = std::dynamic_pointer_cast<RequestBucketInfoCommand>(_sender.command(0)); + + lib::ClusterState state_after("distributor:3 storage:3"); + // Set, but _don't_ enable cluster state. We want it to be pending. + set_cluster_state(state_after); + EXPECT_TRUE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_current_state()); + EXPECT_FALSE(distributor_bucket_space(bucket).get_bucket_ownership_flags(bucket).owned_in_pending_state()); + + ASSERT_NO_FATAL_FAILURE(send_fake_reply_for_single_bucket_request(*rbi)); + + EXPECT_EQ("NONEXISTING", dump_bucket(bucket)); +} + +/* + * If we get a distribution config change, it's important that cluster states that + * arrive after this--but _before_ the pending cluster state has finished--must trigger + * a full bucket info fetch no matter what the cluster state change was! Otherwise, we + * will with a high likelihood end up not getting the complete view of the buckets in + * the cluster. + */ +TEST_F(TopLevelBucketDBUpdaterTest, cluster_state_always_sends_full_fetch_when_distribution_change_pending) { + lib::ClusterState state_before("distributor:6 storage:6"); + { + uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + std::string distConfig(dist_config_6_nodes_across_2_groups()); + set_distribution(distConfig); + + sort_sent_messages_by_index(_sender); + ASSERT_EQ(message_count(6), _sender.commands().size()); + // Suddenly, a wild cluster state change appears! Even though this state + // does not in itself imply any bucket changes, it will still overwrite the + // pending cluster state and thus its state of pending bucket info requests. + set_cluster_state("distributor:6 .2.t:12345 storage:6"); + + ASSERT_EQ(message_count(12), _sender.commands().size()); + + // Send replies for first messageCount(6) (outdated requests). + int num_buckets = 10; + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"), + *_sender.command(i), num_buckets)); + } + // No change from these. + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(1, "distributor:6 storage:6")); + + // Send for current pending. + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + *_sender.command(i + message_count(6)), + num_buckets)); + } + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:6 storage:6")); + _sender.clear(); + + // No more pending global fetch; this should be a no-op state. + set_cluster_state("distributor:6 .3.t:12345 storage:6"); + EXPECT_EQ(size_t(0), _sender.commands().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_triggers_recovery_mode) { + uint32_t num_buckets = 20; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), + message_count(6), num_buckets)); + _sender.clear(); + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + complete_recovery_mode_on_all_stripes(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + set_distribution(dist_config_6_nodes_across_4_groups()); + sort_sent_messages_by_index(_sender); + // No replies received yet, still no recovery mode. + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); + + ASSERT_EQ(message_count(6), _sender.commands().size()); + num_buckets = 10; + for (uint32_t i = 0; i < message_count(6); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:6 storage:6"), + *_sender.command(i), num_buckets)); + } + + // Pending cluster state (i.e. distribution) has been enabled, which should + // cause recovery mode to be entered. + EXPECT_TRUE(all_distributor_stripes_are_in_recovery_mode()); + complete_recovery_mode_on_all_stripes(); + EXPECT_FALSE(all_distributor_stripes_are_in_recovery_mode()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, changed_distribution_config_does_not_elide_bucket_db_pruning) { + set_distribution(dist_config_3_nodes_in_1_group()); + + constexpr uint32_t n_buckets = 100; + ASSERT_NO_FATAL_FAILURE( + set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), message_count(6), n_buckets)); + _sender.clear(); + + // Config implies a different node set than the current cluster state, so it's crucial that + // DB pruning is _not_ elided. Yes, this is inherently racing with cluster state changes and + // should be changed to be atomic and controlled by the cluster controller instead of config. + // But this is where we currently are. + set_distribution(dist_config_6_nodes_across_2_groups()); + for (auto* s : distributor_stripes()) { + const auto& db = s->getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()).getBucketDatabase(); + db.acquire_read_guard()->for_each([&]([[maybe_unused]] uint64_t key, const auto& e) { + auto id = e.getBucketId(); + EXPECT_TRUE(distributor_bucket_space(id).get_bucket_ownership_flags(id).owned_in_pending_state()); + }); + } +} + +TEST_F(TopLevelBucketDBUpdaterTest, newly_added_buckets_have_current_time_as_gc_timestamp) { + fake_clock().setAbsoluteTimeInSeconds(101234); + lib::ClusterState state_before("distributor:1 storage:1"); + { + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + // setAndEnableClusterState adds n buckets with id (16, i) + document::BucketId bucket(16, 0); + BucketDatabase::Entry e = get_bucket(bucket); + ASSERT_TRUE(e.valid()); + EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, newer_mutations_not_overwritten_by_earlier_bucket_fetch) { + { + lib::ClusterState state_before("distributor:1 storage:1 .0.s:i"); + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 0; + // This step is required to make the distributor ready for accepting + // the below explicit database insertion towards node 0. + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + } + _sender.clear(); + fake_clock().setAbsoluteTimeInSeconds(1000); + lib::ClusterState state("distributor:1 storage:1"); + set_cluster_state(state); + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + // Before replying with the bucket info, simulate the arrival of a mutation + // reply that alters the state of the bucket with information that will be + // more recent that what is returned by the bucket info. This information + // must not be lost when the bucket info is later merged into the database. + document::BucketId bucket(16, 1); + constexpr uint64_t insertion_timestamp = 1001ULL * 1000000; + api::BucketInfo wanted_info(5, 6, 7); + stripe_of_bucket(bucket).bucket_db_updater().operation_context().update_bucket_database( + makeDocumentBucket(bucket), + BucketCopy(insertion_timestamp, 0, wanted_info), + DatabaseUpdate::CREATE_IF_NONEXISTING); + + fake_clock().setAbsoluteTimeInSeconds(1002); + constexpr uint32_t buckets_returned = 10; // Buckets (16, 0) ... (16, 9) + // Return bucket information which on the timeline might originate from + // anywhere between [1000, 1002]. Our assumption is that any mutations + // taking place after t=1000 must have its reply received and processed + // by this distributor and timestamped strictly higher than t=1000 (modulo + // clock skew, of course, but that is outside the scope of this). A mutation + // happening before t=1000 but receiving a reply at t>1000 does not affect + // correctness, as this should contain the same bucket info as that + // contained in the full bucket reply and the DB update is thus idempotent. + for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), buckets_returned)); + } + + BucketDatabase::Entry e = get_bucket(bucket); + ASSERT_EQ(uint32_t(1), e->getNodeCount()); + EXPECT_EQ(wanted_info, e->getNodeRef(0).getBucketInfo()); +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::get_send_set() const +{ + std::vector<uint16_t> nodes; + std::transform(_sender.commands().begin(), + _sender.commands().end(), + std::back_inserter(nodes), + [](auto& cmd) + { + auto& req(dynamic_cast<const api::RequestBucketInfoCommand&>(*cmd)); + return req.getAddress()->getIndex(); + }); + return nodes; +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::get_sent_nodes_with_preemption( + const std::string& old_cluster_state, + uint32_t expected_old_state_messages, + const std::string& preempted_cluster_state, + const std::string& new_cluster_state) +{ + uint32_t dummy_buckets_to_return = 10; + // FIXME cannot chain assertion checks in non-void function + set_and_enable_cluster_state(lib::ClusterState(old_cluster_state), + expected_old_state_messages, + dummy_buckets_to_return); + + _sender.clear(); + + set_cluster_state(preempted_cluster_state); + _sender.clear(); + // Do not allow the pending state to become the active state; trigger a + // new transition without ACKing the info requests first. This will + // overwrite the pending state entirely. + set_cluster_state(lib::ClusterState(new_cluster_state)); + return get_send_set(); +} + +std::vector<uint16_t> +TopLevelBucketDBUpdaterTest::expand_node_vec(const std::vector<uint16_t>& nodes) +{ + std::vector<uint16_t> res; + size_t count = _bucket_spaces.size(); + for (const auto &node : nodes) { + for (uint32_t i = 0; i < count; ++i) { + res.push_back(node); + } + } + return res; +} + +/* + * If we don't carry over the set of nodes that we need to fetch from, + * a naive comparison between the active state and the new state will + * make it appear to the distributor that nothing has changed, as any + * database modifications caused by intermediate states will not be + * accounted for (basically the ABA problem in a distributed setting). + */ +TEST_F(TopLevelBucketDBUpdaterTest, preempted_distributor_change_carries_node_set_over_to_next_state_fetch) { + EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}), + get_sent_nodes_with_preemption("version:1 distributor:6 storage:6", + message_count(6), + "version:2 distributor:6 .5.s:d storage:6", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_change_carries_node_set_over_to_next_state_fetch) { + EXPECT_EQ(expand_node_vec({2, 3}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6 .2.s:d", + message_count(5), + "version:2 distributor:6 storage:6 .2.s:d .3.s:d", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, preempted_storage_node_down_must_be_re_fetched) { + EXPECT_EQ(expand_node_vec({2}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6", + message_count(6), + "version:2 distributor:6 storage:6 .2.s:d", + "version:3 distributor:6 storage:6")); +} + +using NodeVec = std::vector<uint16_t>; + +TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_now_in_down_state) { + EXPECT_EQ(NodeVec{}, + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:6 .2.s:d", + message_count(5), + "version:2 distributor:6 storage:6", // Sends to 2. + "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. +} + +TEST_F(TopLevelBucketDBUpdaterTest, do_not_send_to_preempted_node_not_part_of_new_state) { + // Even though 100 nodes are preempted, not all of these should be part + // of the request afterwards when only 6 are part of the state. + EXPECT_EQ(expand_node_vec({0, 1, 2, 3, 4, 5}), + get_sent_nodes_with_preemption( + "version:1 distributor:6 storage:100", + message_count(100), + "version:2 distributor:5 .4.s:d storage:100", + "version:3 distributor:6 storage:6")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, outdated_node_set_cleared_after_successful_state_completion) { + lib::ClusterState state_before("version:1 distributor:6 storage:6 .1.t:1234"); + uint32_t expected_msgs = message_count(6), dummy_buckets_to_return = 10; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state_before, expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + // New cluster state that should not by itself trigger any new fetches, + // unless outdated node set is somehow not cleared after an enabled + // (completed) cluster state has been set. + set_cluster_state("version:3 distributor:6 storage:6"); + EXPECT_EQ(size_t(0), _sender.commands().size()); +} + +// XXX test currently disabled since distribution config currently isn't used +// at all in order to deduce the set of nodes to send to. This might not matter +// in practice since it is assumed that the cluster state matching the new +// distribution config will follow very shortly after the config has been +// applied to the node. The new cluster state will then send out requests to +// the correct node set. +TEST_F(TopLevelBucketDBUpdaterTest, DISABLED_cluster_config_downsize_only_sends_to_available_nodes) { + uint32_t expected_msgs = 6, dummy_buckets_to_return = 20; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:6 storage:6"), + expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + + // Intentionally trigger a racing config change which arrives before the + // new cluster state representing it. + set_distribution(dist_config_3_nodes_in_1_group()); + sort_sent_messages_by_index(_sender); + + EXPECT_EQ((NodeVec{0, 1, 2}), get_send_set()); +} + +/** + * Test scenario where a cluster is downsized by removing a subset of the nodes + * from the distribution configuration. The system must be able to deal with + * a scenario where the set of nodes between two cluster states across a config + * change may differ. + * + * See VESPA-790 for details. + */ +TEST_F(TopLevelBucketDBUpdaterTest, node_missing_from_config_is_treated_as_needing_ownership_transfer) { + uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:3 storage:3"), + expected_msgs, dummy_buckets_to_return)); + _sender.clear(); + + // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config + // that does not contain node 2 while the _active_ cluster state still + // contains this node. + const char* downsize_cfg = + "redundancy 2\n" + "distributor_auto_ownership_transfer_on_whole_group_down true\n" + "group[2]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[2]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n"; + + set_distribution(downsize_cfg); + sort_sent_messages_by_index(_sender); + _sender.clear(); + + // Attempt to apply state with {0, 1} set. This will compare the new state + // with the previous state, which still has node 2. + expected_msgs = message_count(2); + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(lib::ClusterState("distributor:2 storage:2"), + expected_msgs, dummy_buckets_to_return)); + + EXPECT_EQ(expand_node_vec({0, 1}), get_send_set()); +} + } |