diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-08 12:46:35 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-08 12:46:35 +0000 |
commit | a118e0e60faa88f37bd863e3af323f29f12db332 (patch) | |
tree | dd8562f76f0e73303f71c9de1a9b1f57e75d7729 /storage | |
parent | 39bf4a6d515e43b79aa8b145a2da55eb9101ee69 (diff) |
Port first batch of BucketDBUpdater tests from legacy to top-level
Diffstat (limited to 'storage')
6 files changed, 952 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 678c19d4c6f..67a7fed8d0b 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -47,6 +47,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST statoperationtest.cpp statusreporterdelegatetest.cpp throttlingoperationstartertest.cpp + top_level_bucket_db_updater_test.cpp top_level_distributor_test.cpp top_level_distributor_test_util.cpp twophaseupdateoperationtest.cpp diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp index 824fb51acb9..cb61c1bb009 100644 --- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp @@ -561,6 +561,7 @@ LegacyBucketDBUpdaterTest::LegacyBucketDBUpdaterTest() LegacyBucketDBUpdaterTest::~LegacyBucketDBUpdaterTest() = default; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, normal_usage) { setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); @@ -592,6 +593,7 @@ TEST_F(LegacyBucketDBUpdaterTest, normal_usage) { ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:2 storage:3")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, distributor_change) { int numBuckets = 100; @@ -622,6 +624,7 @@ TEST_F(LegacyBucketDBUpdaterTest, distributor_change) { ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:2 storage:3")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, distributor_change_with_grouping) { std::string distConfig(getDistConfig6Nodes2Groups()); setDistribution(distConfig); @@ -653,6 +656,7 @@ TEST_F(LegacyBucketDBUpdaterTest, distributor_change_with_grouping) { ASSERT_EQ(messageCount(6), _sender.commands().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, normal_usage_initializing) { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); @@ -690,6 +694,7 @@ TEST_F(LegacyBucketDBUpdaterTest, normal_usage_initializing) { ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(20, "distributor:1 storage:1")); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, failed_request_bucket_info) { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); @@ -732,6 +737,7 @@ TEST_F(LegacyBucketDBUpdaterTest, failed_request_bucket_info) { EXPECT_EQ(std::string("Set system state"), _senderDown.getCommands()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, down_while_init) { ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); @@ -795,6 +801,7 @@ LegacyBucketDBUpdaterTest::expandNodeVec(const std::vector<uint16_t> &nodes) return res; } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, node_down) { ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); enableDistributorClusterState("distributor:1 storage:3"); @@ -810,6 +817,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_down) { EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, storage_node_in_maintenance_clears_buckets_for_node) { ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); enableDistributorClusterState("distributor:1 storage:3"); @@ -825,6 +833,7 @@ TEST_F(LegacyBucketDBUpdaterTest, storage_node_in_maintenance_clears_buckets_for EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, node_down_copies_get_in_sync) { ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); @@ -842,6 +851,7 @@ TEST_F(LegacyBucketDBUpdaterTest, node_down_copies_get_in_sync) { dumpBucket(bid)); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, initializing_while_recheck) { lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); setSystemState(systemState); @@ -860,6 +870,7 @@ TEST_F(LegacyBucketDBUpdaterTest, initializing_while_recheck) { EXPECT_EQ(MessageType::SETSYSTEMSTATE, _senderDown.command(0)->getType()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, bit_change) { std::vector<document::BucketId> bucketlist; @@ -959,6 +970,7 @@ TEST_F(LegacyBucketDBUpdaterTest, bit_change) { } }; +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, recheck_node_with_failure) { ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); @@ -1002,6 +1014,7 @@ TEST_F(LegacyBucketDBUpdaterTest, recheck_node_with_failure) { EXPECT_EQ(size_t(2), _sender.commands().size()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, recheck_node) { ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp new file mode 100644 index 00000000000..de0b6b22358 --- /dev/null +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -0,0 +1,789 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storage/distributor/top_level_bucket_db_updater.h> +#include <vespa/storage/distributor/bucket_space_distribution_context.h> +#include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/pending_bucket_space_db_transition.h> +#include <vespa/storage/distributor/outdated_nodes_map.h> +#include <vespa/storage/storageutil/distributorstatecache.h> +#include <tests/distributor/top_level_distributor_test_util.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/storage/distributor/simpleclusterinformation.h> +#include <vespa/storage/distributor/top_level_distributor.h> +#include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/util/benchmark_timer.h> +#include <sstream> +#include <iomanip> + +using namespace storage::api; +using namespace storage::lib; +using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; +using document::BucketSpace; +using document::FixedBucketSpaces; +using document::BucketId; +using document::Bucket; + +using namespace ::testing; + +namespace storage::distributor { + +class TopLevelBucketDBUpdaterTest : public Test, + public TopLevelDistributorTestUtil +{ +public: + TopLevelBucketDBUpdaterTest(); + ~TopLevelBucketDBUpdaterTest() override; + + std::vector<document::BucketSpace> _bucket_spaces; + + size_t message_count(size_t messagesPerBucketSpace) const { + return messagesPerBucketSpace * _bucket_spaces.size(); + } + + using NodeCount = int; + using Redundancy = int; + + void SetUp() override { + create_links(); + _bucket_spaces = bucket_spaces(); + // Disable deferred activation by default (at least for now) to avoid breaking the entire world. + bucket_db_updater().set_stale_reads_enabled(false); + setup_distributor(Redundancy(2), NodeCount(10), "cluster:d"); + }; + + void TearDown() override { + close(); + } + + std::shared_ptr<RequestBucketInfoReply> make_fake_bucket_reply( + const lib::ClusterState& state, + const RequestBucketInfoCommand& cmd, + int storageIndex, + uint32_t bucketCount, + uint32_t invalidBucketCount = 0) + { + auto sreply = std::make_shared<RequestBucketInfoReply>(cmd); + sreply->setAddress(storage_address(storageIndex)); + + auto& vec = sreply->getBucketInfo(); + + for (uint32_t i=0; i<bucketCount + invalidBucketCount; i++) { + document::BucketId bucket(16, i); + if (!distributor_bucket_space(bucket).owns_bucket_in_state(state, bucket)) { + continue; + } + + std::vector<uint16_t> nodes; + distributor_bucket_space(bucket).getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, bucket, nodes); + + for (uint32_t j = 0; j < nodes.size(); ++j) { + if (nodes[j] == storageIndex) { + if (i >= bucketCount) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo())); + } else { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } + } + } + } + + return sreply; + } + + void fake_bucket_reply(const lib::ClusterState &state, + const api::StorageCommand &cmd, + uint32_t bucket_count, + uint32_t invalid_bucket_count = 0) + { + ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO); + const api::StorageMessageAddress& address(*cmd.getAddress()); + bucket_db_updater().onRequestBucketInfoReply( + make_fake_bucket_reply(state, + dynamic_cast<const RequestBucketInfoCommand &>(cmd), + address.getIndex(), + bucket_count, + invalid_bucket_count)); + } + + std::string verifyBucket(document::BucketId id, const lib::ClusterState& state) { + BucketDatabase::Entry entry = get_bucket(id); + if (!entry.valid()) { + return vespalib::make_string("%s doesn't exist in DB", id.toString().c_str()); + } + + std::vector<uint16_t> nodes; + distributor_bucket_space(id).getDistribution().getIdealNodes( + lib::NodeType::STORAGE, state, document::BucketId(id), nodes); + + if (nodes.size() != entry->getNodeCount()) { + return vespalib::make_string("Bucket Id %s has %d nodes in " + "ideal state, but has only %d in DB", + id.toString().c_str(), + (int)nodes.size(), + (int)entry->getNodeCount()); + } + + for (uint32_t i = 0; i<nodes.size(); i++) { + bool found = false; + + for (uint32_t j = 0; j<entry->getNodeCount(); j++) { + if (nodes[i] == entry->getNodeRef(j).getNode()) { + found = true; + } + } + + if (!found) { + return vespalib::make_string( + "Bucket Id %s has no copy from node %d", + id.toString().c_str(), + nodes[i]); + } + } + + return ""; + } + + struct OrderByIncreasingNodeIndex { + template <typename T> + bool operator()(const T& lhs, const T& rhs) { + return (lhs->getAddress()->getIndex() + < rhs->getAddress()->getIndex()); + } + }; + + void sort_sent_messages_by_index(DistributorMessageSenderStub& sender, + size_t sortFromOffset = 0) + { + std::sort(sender.commands().begin() + sortFromOffset, + sender.commands().end(), + OrderByIncreasingNodeIndex()); + } + + void set_cluster_state(const lib::ClusterState& state) { + const size_t size_before_state = _sender.commands().size(); + bucket_db_updater().onSetSystemState(std::make_shared<api::SetSystemStateCommand>(state)); + // A lot of test logic has the assumption that all messages sent as a + // result of cluster state changes will be in increasing index order + // (for simplicity, not because this is required for correctness). + // Only sort the messages that arrived as a result of the state, don't + // jumble the sorting with any existing messages. + sort_sent_messages_by_index(_sender, size_before_state); + } + + void set_cluster_state(const vespalib::string& state_str) { + set_cluster_state(lib::ClusterState(state_str)); + } + + bool activate_cluster_state_version(uint32_t version) { + return bucket_db_updater().onActivateClusterStateVersion( + std::make_shared<api::ActivateClusterStateVersionCommand>(version)); + } + + void complete_bucket_info_gathering(const lib::ClusterState& state, + size_t expected_msgs, + uint32_t bucket_count = 1, + uint32_t invalid_bucket_count = 0) + { + ASSERT_EQ(expected_msgs, _sender.commands().size()); + + for (uint32_t i = 0; i < _sender.commands().size(); i++) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), + bucket_count, invalid_bucket_count)); + } + } + + api::StorageMessageAddress storage_address(uint16_t node) { + static vespalib::string _storage("storage"); + return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node); + } + + void assert_correct_buckets(int num_buckets, const std::string& state_str) { + lib::ClusterState state(state_str); + for (int i = 0; i < num_buckets; i++) { + ASSERT_EQ(get_ideal_str(document::BucketId(16, i), state), + get_nodes(document::BucketId(16, i))); + } + } + + void set_distribution(const std::string& dist_config) { + trigger_distribution_change(std::make_shared<lib::Distribution>(dist_config)); + } + + void verify_invalid(document::BucketId id, int storageNode) { + BucketDatabase::Entry entry = get_bucket(id); + ASSERT_TRUE(entry.valid()); + bool found = false; + for (uint32_t j = 0; j < entry->getNodeCount(); j++) { + if (entry->getNodeRef(j).getNode() == storageNode) { + ASSERT_FALSE(entry->getNodeRef(j).valid()); + found = true; + } + } + + ASSERT_TRUE(found); + } + + void set_storage_nodes(uint32_t numStorageNodes) { + _sender.clear(); + set_cluster_state(lib::ClusterState(vespalib::make_string("distributor:1 storage:%d", numStorageNodes))); + + for (uint32_t i=0; i< message_count(numStorageNodes); i++) { + ASSERT_EQ(_sender.command(i)->getType(), MessageType::REQUESTBUCKETINFO); + + const api::StorageMessageAddress *address = _sender.command(i)->getAddress(); + ASSERT_EQ(i / _bucket_spaces.size(), address->getIndex()); + } + } + + bool bucket_has_node(document::BucketId id, uint16_t node) const { + BucketDatabase::Entry entry = get_bucket(id); + assert(entry.valid()); + + for (uint32_t j = 0; j < entry->getNodeCount(); ++j) { + if (entry->getNodeRef(j).getNode() == node) { + return true; + } + } + return false; + } + + bool bucket_exists_that_has_node(int bucket_count, uint16_t node) const { + for (int i = 1; i < bucket_count; ++i) { + if (bucket_has_node(document::BucketId(16, i), node)) { + return true; + } + } + return false; + } + + std::string dump_bucket(const document::BucketId& id) const { + return get_bucket(id).toString(); + } + + void initialize_nodes_and_buckets(uint32_t num_storage_nodes, uint32_t num_buckets) { + ASSERT_NO_FATAL_FAILURE(set_storage_nodes(num_storage_nodes)); + + vespalib::string state(vespalib::make_string("distributor:1 storage:%d", num_storage_nodes)); + lib::ClusterState new_state(state); + + for (uint32_t i = 0; i < message_count(num_storage_nodes); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(new_state, *_sender.command(i), num_buckets)); + } + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, state)); + } + +}; + +TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest() + : TopLevelDistributorTestUtil(), + _bucket_spaces() +{ +} + +TopLevelBucketDBUpdaterTest::~TopLevelBucketDBUpdaterTest() = default; + +namespace { + +std::string dist_config_6_nodes_across_2_groups() { + return ("redundancy 2\n" + "group[3]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[3]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[1].nodes[2].index 2\n" + "group[2].name rack1\n" + "group[2].index 1\n" + "group[2].nodes[3]\n" + "group[2].nodes[0].index 3\n" + "group[2].nodes[1].index 4\n" + "group[2].nodes[2].index 5\n"); +} + +std::string dist_config_6_nodes_across_4_groups() { + return ("redundancy 2\n" + "group[4]\n" + "group[0].name \"invalid\"\n" + "group[0].index \"invalid\"\n" + "group[0].partitions 1|*\n" + "group[0].nodes[0]\n" + "group[1].name rack0\n" + "group[1].index 0\n" + "group[1].nodes[2]\n" + "group[1].nodes[0].index 0\n" + "group[1].nodes[1].index 1\n" + "group[2].name rack1\n" + "group[2].index 1\n" + "group[2].nodes[2]\n" + "group[2].nodes[0].index 2\n" + "group[2].nodes[1].index 3\n" + "group[3].name rack2\n" + "group[3].index 2\n" + "group[3].nodes[2]\n" + "group[3].nodes[0].index 4\n" + "group[3].nodes[1].index 5\n"); +} + +std::string +make_string_list(std::string s, uint32_t count) +{ + std::ostringstream ost; + for (uint32_t i = 0; i < count; ++i) { + if (i > 0) { + ost << ","; + } + ost << s; + } + return ost.str(); +} + +std::string +make_request_bucket_info_strings(uint32_t count) +{ + return make_string_list("Request bucket info", count); +} + +} + +TEST_F(TopLevelBucketDBUpdaterTest, normal_usage) { + set_cluster_state(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); + + ASSERT_EQ(message_count(3), _sender.commands().size()); + + // Ensure distribution hash is set correctly + ASSERT_EQ(_component->getDistribution()->getNodeGraph().getDistributionConfigHash(), + dynamic_cast<const RequestBucketInfoCommand&>(*_sender.command(0)).getDistributionHash()); + + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + *_sender.command(0), 10)); + + _sender.clear(); + + // Optimization for not refetching unneeded data after cluster state + // change is only implemented after completion of previous cluster state + set_cluster_state("distributor:2 .0.s:i storage:3"); + + ASSERT_EQ(message_count(3), _sender.commands().size()); + // Expect reply of first set SystemState request. + ASSERT_EQ(size_t(1), _sender.replies().size()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering( + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + message_count(3), 10)); + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(10, "distributor:2 storage:3")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, distributor_change) { + int num_buckets = 100; + + // First sends request + set_cluster_state("distributor:2 .0.s:i .1.s:i storage:3"); + ASSERT_EQ(message_count(3), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + message_count(3), num_buckets)); + _sender.clear(); + + // No change from initializing to up (when done with last job) + set_cluster_state("distributor:2 storage:3"); + ASSERT_EQ(size_t(0), _sender.commands().size()); + _sender.clear(); + + // Adding node. No new read requests, but buckets thrown + set_cluster_state("distributor:3 storage:3"); + ASSERT_EQ(size_t(0), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:3 storage:3")); + _sender.clear(); + + // Removing distributor. Need to refetch new data from all nodes. + set_cluster_state("distributor:2 storage:3"); + ASSERT_EQ(message_count(3), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:2 storage:3"), + message_count(3), num_buckets)); + _sender.clear(); + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, "distributor:2 storage:3")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, distributor_change_with_grouping) { + set_distribution(dist_config_6_nodes_across_2_groups()); + int numBuckets = 100; + + set_cluster_state("distributor:6 storage:6"); + ASSERT_EQ(message_count(6), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:6 storage:6"), + message_count(6), numBuckets)); + _sender.clear(); + + // Distributor going down in other group, no change + set_cluster_state("distributor:6 .5.s:d storage:6"); + ASSERT_EQ(size_t(0), _sender.commands().size()); + _sender.clear(); + + set_cluster_state("distributor:6 storage:6"); + ASSERT_EQ(size_t(0), _sender.commands().size()); + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(numBuckets, "distributor:6 storage:6")); + _sender.clear(); + + // Unchanged grouping cause no change. + set_distribution(dist_config_6_nodes_across_2_groups()); + ASSERT_EQ(size_t(0), _sender.commands().size()); + + // Changed grouping cause change + set_distribution(dist_config_6_nodes_across_4_groups()); + + ASSERT_EQ(message_count(6), _sender.commands().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, normal_usage_initializing) { + set_cluster_state("distributor:1 .0.s:i storage:1 .0.s:i"); + + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + // Not yet passing on system state. + ASSERT_EQ(size_t(0), _sender_down.commands().size()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucket_spaces.size(), 10, 10)); + + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(10, "distributor:1 storage:1")); + + for (int i = 10; i < 20; ++i) { + ASSERT_NO_FATAL_FAILURE(verify_invalid(document::BucketId(16, i), 0)); + } + + // Pass on cluster state and recheck buckets now. + ASSERT_EQ(size_t(1), _sender_down.commands().size()); + + _sender.clear(); + _sender_down.clear(); + + set_cluster_state("distributor:1 .0.s:i storage:1"); + + // Send a new request bucket info up. + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucket_spaces.size(), 20)); + + // Pass on cluster state and recheck buckets now. + ASSERT_EQ(size_t(1), _sender_down.commands().size()); + + ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(20, "distributor:1 storage:1")); +} + +TEST_F(TopLevelBucketDBUpdaterTest, failed_request_bucket_info) { + set_cluster_state("distributor:1 .0.s:i storage:1"); + + // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + { + for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) { + auto reply = make_fake_bucket_reply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(i)), + 0, + 10); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + bucket_db_updater().onRequestBucketInfoReply(reply); + } + + // Trigger that delayed message is sent + fake_clock().addSecondsToTime(10); + bucket_db_updater().resend_delayed_messages(); + } + + // Should be resent. + ASSERT_EQ(make_request_bucket_info_strings(message_count(2)), _sender.getCommands()); + + ASSERT_EQ(size_t(0), _sender_down.commands().size()); + + for (uint32_t i = 0; i < _bucket_spaces.size(); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *_sender.command(_bucket_spaces.size() + i), 10)); + } + + for (int i=0; i<10; i++) { + EXPECT_EQ(std::string(""), + verifyBucket(document::BucketId(16, i), + lib::ClusterState("distributor:1 storage:1"))); + } + + // Set system state should now be passed on + EXPECT_EQ(std::string("Set system state"), _sender_down.getCommands()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, down_while_init) { + ASSERT_NO_FATAL_FAILURE(set_storage_nodes(3)); + + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 storage:3"), + *_sender.command(0), 5)); + + set_cluster_state("distributor:1 storage:3 .1.s:d"); + + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 storage:3"), + *_sender.command(2), 5)); + + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(lib::ClusterState("distributor:1 storage:3"), + *_sender.command(1), 5)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, node_down) { + ASSERT_NO_FATAL_FAILURE(set_storage_nodes(3)); + enable_distributor_cluster_state("distributor:1 storage:3"); + + for (int i = 1; i < 100; ++i) { + add_ideal_nodes(document::BucketId(16, i)); + } + + EXPECT_TRUE(bucket_exists_that_has_node(100, 1)); + + set_cluster_state("distributor:1 storage:3 .1.s:d"); + + EXPECT_FALSE(bucket_exists_that_has_node(100, 1)); +} + + +TEST_F(TopLevelBucketDBUpdaterTest, storage_node_in_maintenance_clears_buckets_for_node) { + ASSERT_NO_FATAL_FAILURE(set_storage_nodes(3)); + enable_distributor_cluster_state("distributor:1 storage:3"); + + for (int i = 1; i < 100; ++i) { + add_ideal_nodes(document::BucketId(16, i)); + } + + EXPECT_TRUE(bucket_exists_that_has_node(100, 1)); + + set_cluster_state("distributor:1 storage:3 .1.s:m"); + + EXPECT_FALSE(bucket_exists_that_has_node(100, 1)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, node_down_copies_get_in_sync) { + ASSERT_NO_FATAL_FAILURE(set_storage_nodes(3)); + document::BucketId bid(16, 1); + + add_nodes_to_stripe_bucket_db(bid, "0=3,1=2,2=3"); + + set_cluster_state("distributor:1 storage:3 .1.s:d"); + + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " + "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"), + dump_bucket(bid)); +} + +TEST_F(TopLevelBucketDBUpdaterTest, initializing_while_recheck) { + lib::ClusterState state("distributor:1 storage:2 .0.s:i .0.i:0.1"); + set_cluster_state(state); + + ASSERT_EQ(message_count(2), _sender.commands().size()); + ASSERT_EQ(size_t(0), _sender_down.commands().size()); + + auto bucket = makeDocumentBucket(document::BucketId(16, 3)); + stripe_of_bucket(bucket.getBucketId()).bucket_db_updater().recheckBucketInfo(1, bucket); + + for (uint32_t i = 0; i < message_count(2); ++i) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), 100)); + } + + // Now we can pass on system state. + ASSERT_EQ(size_t(1), _sender_down.commands().size()); + EXPECT_EQ(MessageType::SETSYSTEMSTATE, _sender_down.command(0)->getType()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, bit_change) { + std::vector<document::BucketId> bucketlist; + + { + set_cluster_state("bits:14 storage:1 distributor:2"); + + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + for (uint32_t bsi = 0; bsi < _bucket_spaces.size(); ++bsi) { + ASSERT_EQ(_sender.command(bsi)->getType(), MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.command(bsi)); + auto sreply = std::make_shared<RequestBucketInfoReply>(req); + sreply->setAddress(storage_address(0)); + auto& vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + int cnt=0; + for (int i=0; cnt < 2; i++) { + auto distribution = _component->getDistribution(); + std::vector<uint16_t> distributors; + if (distribution->getIdealDistributorNode( + lib::ClusterState("bits:14 storage:1 distributor:2"), + document::BucketId(16, i)) + == 0) + { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + + bucketlist.push_back(document::BucketId(16, i)); + cnt++; + } + } + } + + bucket_db_updater().onRequestBucketInfoReply(sreply); + } + } + + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(bucketlist[0])); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(bucketlist[1])); + + { + _sender.clear(); + set_cluster_state("bits:16 storage:1 distributor:2"); + + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + for (uint32_t bsi = 0; bsi < _bucket_spaces.size(); ++bsi) { + + ASSERT_EQ(_sender.command(bsi)->getType(), MessageType::REQUESTBUCKETINFO); + const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.command(bsi)); + auto sreply = std::make_shared<RequestBucketInfoReply>(req); + sreply->setAddress(storage_address(0)); + sreply->setResult(api::ReturnCode::OK); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + + for (uint32_t i = 0; i < 3; ++i) { + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, i), + api::BucketInfo(10,1,1))); + } + + vec.push_back(api::RequestBucketInfoReply::Entry( + document::BucketId(16, 4), + api::BucketInfo(10,1,1))); + } + + bucket_db_updater().onRequestBucketInfoReply(sreply); + } + } + + EXPECT_EQ(std::string("BucketId(0x4000000000000000) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 0))); + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 2))); + EXPECT_EQ(std::string("BucketId(0x4000000000000004) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 4))); + + _sender.clear(); + set_cluster_state("storage:1 distributor:2 .1.s:i"); + + _sender.clear(); + set_cluster_state("storage:1 distributor:2"); +}; + +TEST_F(TopLevelBucketDBUpdaterTest, recheck_node_with_failure) { + ASSERT_NO_FATAL_FAILURE(initialize_nodes_and_buckets(3, 5)); + + _sender.clear(); + + auto bucket = makeDocumentBucket(document::BucketId(16, 3)); + auto& stripe_bucket_db_updater = stripe_of_bucket(bucket.getBucketId()).bucket_db_updater(); + stripe_bucket_db_updater.recheckBucketInfo(1, bucket); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + + uint16_t index = 0; + { + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(bucket.getBucketId(), rbi.getBuckets()[0]); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + const api::StorageMessageAddress *address = _sender.command(0)->getAddress(); + index = address->getIndex(); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + stripe_bucket_db_updater.onRequestBucketInfoReply(reply); + // Trigger that delayed message is sent + fake_clock().addSecondsToTime(10); + stripe_bucket_db_updater.resendDelayedMessages(); + } + + ASSERT_EQ(size_t(2), _sender.commands().size()); + + set_cluster_state(vespalib::make_string("distributor:1 storage:3 .%d.s:d", index)); + + // Recheck bucket. + { + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(1)); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(bucket.getBucketId(), rbi.getBuckets()[0]); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->setResult(api::ReturnCode::NOT_CONNECTED); + stripe_bucket_db_updater.onRequestBucketInfoReply(reply); + } + + // Should not retry since node is down. + EXPECT_EQ(size_t(2), _sender.commands().size()); +} + +TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) { + ASSERT_NO_FATAL_FAILURE(initialize_nodes_and_buckets(3, 5)); + + _sender.clear(); + + auto bucket = makeDocumentBucket(document::BucketId(16, 3)); + auto& stripe_bucket_db_updater = stripe_of_bucket(bucket.getBucketId()).bucket_db_updater(); + stripe_bucket_db_updater.recheckBucketInfo(1, bucket); + + ASSERT_EQ(size_t(1), _sender.commands().size()); + + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(bucket.getBucketId(), rbi.getBuckets()[0]); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry(document::BucketId(16, 3), + api::BucketInfo(20, 10, 12, 50, 60, true, true))); + stripe_bucket_db_updater.onRequestBucketInfoReply(reply); + + lib::ClusterState state("distributor:1 storage:3"); + for (uint32_t i = 0; i < 3; i++) { + EXPECT_EQ(get_ideal_str(document::BucketId(16, i), state), + get_nodes(document::BucketId(16, i))); + } + + for (uint32_t i = 4; i < 5; i++) { + EXPECT_EQ(get_ideal_str(document::BucketId(16, i), state), + get_nodes(document::BucketId(16, i))); + } + + BucketDatabase::Entry entry = get_bucket(bucket); + ASSERT_TRUE(entry.valid()); + + const BucketCopy* copy = entry->getNode(1); + ASSERT_TRUE(copy != nullptr); + EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); +} + +} diff --git a/storage/src/tests/distributor/top_level_distributor_test.cpp b/storage/src/tests/distributor/top_level_distributor_test.cpp index 348a964dd53..d8df36e53b2 100644 --- a/storage/src/tests/distributor/top_level_distributor_test.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test.cpp @@ -112,10 +112,6 @@ struct TopLevelDistributorTest : Test, TopLevelDistributorTestUtil { distributor_stripes().front()->db_memory_sample_interval()).count(); } - static std::vector<document::BucketSpace> bucket_spaces() { - return {document::FixedBucketSpaces::default_space(), document::FixedBucketSpaces::global_space()}; - } - size_t explicit_node_state_reply_send_invocations() const noexcept { return _node->getNodeStateUpdater().explicit_node_state_reply_send_invocations(); } @@ -242,8 +238,8 @@ public: TEST_F(TopLevelDistributorTest, tick_aggregates_status_requests_from_all_stripes) { setup_distributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); - ASSERT_NE(stripe_of_bucket(document::BucketId(16, 1)), - stripe_of_bucket(document::BucketId(16, 2))); + ASSERT_NE(stripe_index_of_bucket(document::BucketId(16, 1)), + stripe_index_of_bucket(document::BucketId(16, 2))); add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t"); add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=2/2/2/t"); @@ -521,8 +517,8 @@ TEST_F(TopLevelDistributorTest, leaving_recovery_mode_immediately_sends_getnodes ASSERT_EQ(0, explicit_node_state_reply_send_invocations()); // Add a couple of buckets so we have something to iterate over. 2 buckets // map to the same stripe so we'll need 2 ticks to complete a full scan. - ASSERT_EQ(stripe_of_bucket(document::BucketId(16, 1)), - stripe_of_bucket(document::BucketId(16, 5))); + ASSERT_EQ(stripe_index_of_bucket(document::BucketId(16, 1)), + stripe_index_of_bucket(document::BucketId(16, 5))); add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1/1/1/t/a"); add_nodes_to_stripe_bucket_db(document::BucketId(16, 2), "0=1/1/1/t/a"); diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 0120ca91787..c4173f5e8ff 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -88,13 +88,13 @@ TopLevelDistributorTestUtil::setup_distributor(int redundancy, } size_t -TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) const noexcept +TopLevelDistributorTestUtil::stripe_index_of_bucket(const document::BucketId& id) const noexcept { return stripe_of_bucket_key(id.toKey(), _distributor->_n_stripe_bits); } size_t -TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) const noexcept +TopLevelDistributorTestUtil::stripe_index_of_bucket(const document::Bucket& bucket) const noexcept { return stripe_of_bucket_key(bucket.getBucketId().toKey(), _distributor->_n_stripe_bits); } @@ -177,7 +177,73 @@ TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::Bucke entry->addNodeManual(node); } - stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).update(entry); + stripe_bucket_database(stripe_index_of_bucket(bucket), bucket.getBucketSpace()).update(entry); +} + +std::string +TopLevelDistributorTestUtil::get_ideal_str(document::BucketId id, const lib::ClusterState& state) +{ + if (!distributor_bucket_space(id).owns_bucket_in_state(state, id)) { + return id.toString(); + } + std::vector<uint16_t> nodes; + _component->getDistribution()->getIdealNodes(lib::NodeType::STORAGE, state, id, nodes); + std::sort(nodes.begin(), nodes.end()); + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); +} + +void +TopLevelDistributorTestUtil::add_ideal_nodes(const lib::ClusterState& state, const document::BucketId& id) +{ + BucketDatabase::Entry entry = get_bucket(id); + + if (!entry.valid()) { + entry = BucketDatabase::Entry(id); + } + + std::vector<uint16_t> res; + assert(_component.get()); + _component->getDistribution()->getIdealNodes(lib::NodeType::STORAGE, state, id, res); + + for (uint32_t i = 0; i < res.size(); ++i) { + if (state.getNodeState(lib::Node(lib::NodeType::STORAGE, res[i])).getState() != + lib::State::MAINTENANCE) + { + entry->addNode(BucketCopy(0, res[i], api::BucketInfo(1,1,1)), + toVector<uint16_t>(0)); + } + } + + stripe_bucket_database(stripe_index_of_bucket(id)).update(entry); +} + +void +TopLevelDistributorTestUtil::add_ideal_nodes(const document::BucketId& id) +{ + // TODO STRIPE good way of getting current active cluster state on top-level distributor + // We assume that all stripes have the same cluster state internally, so just use the first. + assert(_distributor->_stripes[0]); + const auto& bundle = _distributor->_stripes[0]->getClusterStateBundle(); + add_ideal_nodes(*bundle.getBaselineClusterState(), id); +} + +std::string +TopLevelDistributorTestUtil::get_nodes(document::BucketId id) +{ + BucketDatabase::Entry entry = get_bucket(id); + + if (!entry.valid()) { + return id.toString(); + } else { + std::vector<uint16_t> nodes = entry->getNodes(); + std::sort(nodes.begin(), nodes.end()); + + std::ostringstream ost; + ost << id << ": " << dumpVector(nodes); + return ost.str(); + } } void @@ -190,13 +256,13 @@ TopLevelDistributorTestUtil::add_nodes_to_stripe_bucket_db(const document::Bucke BucketDatabase::Entry TopLevelDistributorTestUtil::get_bucket(const document::Bucket& bucket) const { - return stripe_bucket_database(stripe_of_bucket(bucket), bucket.getBucketSpace()).get(bucket.getBucketId()); + return stripe_bucket_database(stripe_index_of_bucket(bucket), bucket.getBucketSpace()).get(bucket.getBucketId()); } BucketDatabase::Entry TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const { - return stripe_bucket_database(stripe_of_bucket(bId)).get(bId); + return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId); } TopLevelBucketDBUpdater& @@ -223,9 +289,40 @@ TopLevelDistributorTestUtil::node_context() const { return _distributor->distributor_component(); } -storage::distributor::DistributorStripeOperationContext& -TopLevelDistributorTestUtil::operation_context() { - return _distributor->distributor_component(); +DistributorBucketSpace& +TopLevelDistributorTestUtil::distributor_bucket_space(const document::BucketId& id) +{ + return stripe_of_bucket(id).getBucketSpaceRepo().get(makeBucketSpace()); +} + +const DistributorBucketSpace& +TopLevelDistributorTestUtil::distributor_bucket_space(const document::BucketId& id) const +{ + return stripe_of_bucket(id).getBucketSpaceRepo().get(makeBucketSpace()); +} + +DistributorStripe& +TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) noexcept +{ + return *_distributor->_stripes[stripe_index_of_bucket(id)]; +} + +const DistributorStripe& +TopLevelDistributorTestUtil::stripe_of_bucket(const document::BucketId& id) const noexcept +{ + return *_distributor->_stripes[stripe_index_of_bucket(id)]; +} + +DistributorStripe& +TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) noexcept +{ + return *_distributor->_stripes[stripe_index_of_bucket(bucket.getBucketId())]; +} + +const DistributorStripe& +TopLevelDistributorTestUtil::stripe_of_bucket(const document::Bucket& bucket) const noexcept +{ + return *_distributor->_stripes[stripe_index_of_bucket(bucket.getBucketId())]; } bool @@ -319,4 +416,18 @@ TopLevelDistributorTestUtil::enable_distributor_cluster_state(const lib::Cluster bucket_db_updater().simulate_cluster_state_bundle_activation(state); } +std::vector<document::BucketSpace> +TopLevelDistributorTestUtil::bucket_spaces() +{ + return {document::FixedBucketSpaces::default_space(), document::FixedBucketSpaces::global_space()}; +} + +void +TopLevelDistributorTestUtil::trigger_distribution_change(std::shared_ptr<lib::Distribution> distr) +{ + _node->getComponentRegister().setDistribution(std::move(distr)); + _distributor->storageDistributionChanged(); + _distributor->enableNextDistribution(); +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index b12e9dff304..9048160b652 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -18,6 +18,7 @@ namespace framework { struct TickingThreadPool; } namespace distributor { class TopLevelDistributor; +class DistributorBucketSpace; class DistributorMetricSet; class DistributorNodeContext; class DistributorStripe; @@ -38,8 +39,13 @@ public: void close(); - size_t stripe_of_bucket(const document::BucketId& id) const noexcept; - size_t stripe_of_bucket(const document::Bucket& bucket) const noexcept; + size_t stripe_index_of_bucket(const document::BucketId& id) const noexcept; + size_t stripe_index_of_bucket(const document::Bucket& bucket) const noexcept; + + DistributorStripe& stripe_of_bucket(const document::BucketId& id) noexcept; + const DistributorStripe& stripe_of_bucket(const document::BucketId& id) const noexcept; + DistributorStripe& stripe_of_bucket(const document::Bucket& bucket) noexcept; + const DistributorStripe& stripe_of_bucket(const document::Bucket& bucket) const noexcept; /** * Parses the given string to a set of node => bucket info data, @@ -56,7 +62,9 @@ public: const IdealStateMetricSet& total_ideal_state_metrics() const; const DistributorMetricSet& total_distributor_metrics() const; const storage::distributor::DistributorNodeContext& node_context() const; - storage::distributor::DistributorStripeOperationContext& operation_context(); + + DistributorBucketSpace& distributor_bucket_space(const document::BucketId& id); + const DistributorBucketSpace& distributor_bucket_space(const document::BucketId& id) const; std::vector<DistributorStripe*> distributor_stripes() const; @@ -93,6 +101,17 @@ public: // Gets bucket entry from default space only BucketDatabase::Entry get_bucket(const document::BucketId& bId) const; + std::string get_ideal_str(document::BucketId id, const lib::ClusterState& state); + + void add_ideal_nodes(const lib::ClusterState& state, const document::BucketId& id); + void add_ideal_nodes(const document::BucketId& id); + + /** + * Returns a string with the nodes currently stored in the bucket + * database for the given bucket. + */ + std::string get_nodes(document::BucketId id); + DistributorMessageSenderStub& sender() noexcept { return _sender; } const DistributorMessageSenderStub& sender() const noexcept { return _sender; } @@ -101,6 +120,10 @@ public: void receive_set_system_state_command(const vespalib::string& state_str); bool handle_top_level_message(const std::shared_ptr<api::StorageMessage>& msg); + void trigger_distribution_change(std::shared_ptr<lib::Distribution> distr); + + static std::vector<document::BucketSpace> bucket_spaces(); + protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; |