diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-24 17:22:46 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-11-27 13:54:54 +0100 |
commit | b138b00a987afdaf8e4383a4e555b4889d6db81d (patch) | |
tree | ae5b0b6f0dcbd338b1a4c1763a5dc2ad3ed8a5aa /storage | |
parent | a0b9c2e42f3d3f2449def853a56a02d74bc92f9f (diff) |
Move bucket ownership methods to DistributorBucketSpace.
Diffstat (limited to 'storage')
12 files changed, 465 insertions, 87 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 1e70b6a4881..96df1a842a4 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -10,6 +10,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST bucketdbupdatertest.cpp bucketgctimecalculatortest.cpp bucketstateoperationtest.cpp + distributor_bucket_space_test.cpp distributor_host_info_reporter_test.cpp distributor_message_sender_stub.cpp distributortest.cpp diff --git a/storage/src/tests/distributor/distributor_bucket_space_test.cpp b/storage/src/tests/distributor/distributor_bucket_space_test.cpp new file mode 100644 index 00000000000..05abf315607 --- /dev/null +++ b/storage/src/tests/distributor/distributor_bucket_space_test.cpp @@ -0,0 +1,202 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_bucket_space_repo.h> +#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vespalib/gtest/gtest.h> + +using document::BucketId; +using storage::lib::ClusterState; +using storage::lib::Distribution; + +namespace storage::distributor { + +namespace { + +std::shared_ptr<ClusterState> stable_state(std::make_shared<ClusterState>("distributor:4 storage:4 bits:8")); +std::shared_ptr<ClusterState> node_1_down_state(std::make_shared<ClusterState>("distributor:4 .1.s:d storage:4 .1.s:d bits:8")); +std::shared_ptr<Distribution> distribution_r1(std::make_shared<Distribution>(Distribution::getDefaultDistributionConfig(1, 4))); +std::shared_ptr<Distribution> distribution_r2(std::make_shared<Distribution>(Distribution::getDefaultDistributionConfig(2, 4))); + +} + +struct DistributorBucketSpaceTest : public ::testing::Test +{ + using CountVector = std::vector<uint32_t>; + + DistributorBucketSpace bucket_space; + + DistributorBucketSpaceTest() + : ::testing::Test(), + bucket_space(0u) + { + } + ~DistributorBucketSpaceTest() = default; + uint32_t count_distributor_buckets(); + uint32_t count_storage_buckets(); + uint32_t count_deep_split_distributor_buckets(); + uint32_t count_deep_split_storage_buckets(); + CountVector count_buckets(); + CountVector count_deep_split_buckets(); +}; + +uint32_t +DistributorBucketSpaceTest::count_distributor_buckets() +{ + uint32_t owned_buckets = 0; + uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount(); + for (uint32_t i = 0; i < (1u << distribution_bits); ++i) { + BucketId bucket(distribution_bits, i); + bool owned = bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned(); + bool check_owned = bucket_space.check_ownership_in_pending_and_current_state_fallback(bucket).isOwned(); + EXPECT_EQ(check_owned, owned); + if (owned) { + ++owned_buckets; + } + } + return owned_buckets; +} + +uint32_t +DistributorBucketSpaceTest::count_storage_buckets() +{ + uint32_t owned_buckets = 0; + uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount(); + for (uint32_t i = 0; i < (1u << distribution_bits); ++i) { + BucketId bucket(distribution_bits, i); + auto ideal_nodes = bucket_space.get_ideal_nodes(bucket); + auto check_ideal_nodes = bucket_space.get_ideal_nodes_fallback(bucket); + EXPECT_EQ(check_ideal_nodes, ideal_nodes); + for (auto node : ideal_nodes) { + if (node == 0u) { + ++owned_buckets; + } + } + } + return owned_buckets; +} + +uint32_t +DistributorBucketSpaceTest::count_deep_split_distributor_buckets() +{ + uint32_t owned_buckets = 0; + uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount(); + uint32_t bias = 0; + uint32_t bias_max = std::min(1u << distribution_bits, 1000u); + for (; bias < bias_max; ++bias) { + BucketId bucket(distribution_bits, bias); + if (bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned()) { + break; + } + } + assert(bias < bias_max); + for (uint32_t i = 0; i < 100; ++i) { + BucketId bucket(42u, i * (1ul << 32) + bias); + bool owned = bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned(); + bool check_owned = bucket_space.check_ownership_in_pending_and_current_state_fallback(bucket).isOwned(); + EXPECT_EQ(check_owned, owned); + if (owned) { + ++owned_buckets; + } + } + return owned_buckets; +} + +uint32_t +DistributorBucketSpaceTest::count_deep_split_storage_buckets() +{ + uint32_t owned_buckets = 0; + uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount(); + uint32_t bias = 0; + uint32_t bias_max = std::min(1u << distribution_bits, 1000u); + for (; bias < bias_max; ++bias) { + BucketId bucket(distribution_bits, bias); + auto ideal_nodes = bucket_space.get_ideal_nodes(bucket); + bool found = false; + for (auto node : ideal_nodes) { + if (node == 0u) { + found = true; + } + } + if (found) { + break; + } + } + assert(bias < bias_max); + for (uint32_t i = 0; i < 100; ++i) { + BucketId bucket(42u, i * (1ul << 32) + bias); + auto ideal_nodes = bucket_space.get_ideal_nodes(bucket); + auto check_ideal_nodes = bucket_space.get_ideal_nodes_fallback(bucket); + EXPECT_EQ(check_ideal_nodes, ideal_nodes); + for (auto node : ideal_nodes) { + if (node == 0u) { + ++owned_buckets; + } + } + } + return owned_buckets; +} + +DistributorBucketSpaceTest::CountVector +DistributorBucketSpaceTest::count_buckets() +{ + CountVector result; + result.push_back(count_distributor_buckets()); + result.push_back(count_storage_buckets()); + return result; +} + +DistributorBucketSpaceTest::CountVector +DistributorBucketSpaceTest::count_deep_split_buckets() +{ + CountVector result; + result.push_back(count_deep_split_distributor_buckets()); + result.push_back(count_deep_split_storage_buckets()); + return result; +} + +TEST_F(DistributorBucketSpaceTest, check_owned_buckets) +{ + bucket_space.setDistribution(distribution_r1); + bucket_space.setClusterState(stable_state); + EXPECT_EQ((CountVector{64u, 64u}), count_buckets()); + bucket_space.set_pending_cluster_state(node_1_down_state); + EXPECT_EQ((CountVector{64u, 64u}), count_buckets()); + bucket_space.setClusterState(node_1_down_state); + bucket_space.set_pending_cluster_state({}); + EXPECT_EQ((CountVector{86u, 86u}), count_buckets()); + bucket_space.set_pending_cluster_state(stable_state); + EXPECT_EQ((CountVector{64u, 86u}), count_buckets()); + bucket_space.setClusterState(stable_state); + bucket_space.set_pending_cluster_state({}); + EXPECT_EQ((CountVector{64u, 64u}), count_buckets()); + bucket_space.setDistribution(distribution_r2); + EXPECT_EQ((CountVector{64u, 125u}), count_buckets()); +} + +TEST_F(DistributorBucketSpaceTest, check_available_nodes) +{ + bucket_space.setDistribution(distribution_r1); + bucket_space.setClusterState(stable_state); + EXPECT_EQ((std::vector<bool>{true, true, true, true}), bucket_space.get_available_nodes()); + bucket_space.set_pending_cluster_state(node_1_down_state); + EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes()); + bucket_space.setClusterState(node_1_down_state); + bucket_space.set_pending_cluster_state({}); + EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes()); + bucket_space.set_pending_cluster_state(stable_state); + EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes()); + bucket_space.setClusterState(stable_state); + bucket_space.set_pending_cluster_state({}); + EXPECT_EQ((std::vector<bool>{true, true, true, true}), bucket_space.get_available_nodes()); +} + +TEST_F(DistributorBucketSpaceTest, check_owned_deep_split_buckets) +{ + bucket_space.setDistribution(distribution_r1); + bucket_space.setClusterState(stable_state); + EXPECT_EQ((CountVector{100u, 19u}), count_deep_split_buckets()); +} + +} diff --git a/storage/src/tests/distributor/simplemaintenancescannertest.cpp b/storage/src/tests/distributor/simplemaintenancescannertest.cpp index b21a10c319e..58dc2430041 100644 --- a/storage/src/tests/distributor/simplemaintenancescannertest.cpp +++ b/storage/src/tests/distributor/simplemaintenancescannertest.cpp @@ -36,7 +36,7 @@ void SimpleMaintenanceScannerTest::SetUp() { _priorityGenerator = std::make_unique<MockMaintenancePriorityGenerator>(); - _bucketSpaceRepo = std::make_unique<DistributorBucketSpaceRepo>(); + _bucketSpaceRepo = std::make_unique<DistributorBucketSpaceRepo>(0u); _priorityDb = std::make_unique<SimpleBucketPriorityDatabase>(); _scanner = std::make_unique<SimpleMaintenanceScanner>(*_priorityDb, *_priorityGenerator, *_bucketSpaceRepo); } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index c5fd7027fa6..388e696928b 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -106,13 +106,8 @@ BucketDBUpdater::hasPendingClusterState() const BucketOwnership BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const { - if (hasPendingClusterState()) { - const auto& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace())); - if (!_distributorComponent.ownsBucketInState(state, b)) { - return BucketOwnership::createNotOwnedInState(state); - } - } - return BucketOwnership::createOwned(); + auto &bucket_space(_distributorComponent.getBucketSpaceRepo().get(b.getBucketSpace())); + return bucket_space.check_ownership_in_pending_state(b.getBucketId()); } const lib::ClusterState* @@ -317,6 +312,7 @@ BucketDBUpdater::storageDistributionChanged() _distributorComponent.getBucketSpaceRepo(), _distributorComponent.getUniqueTimestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); + _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); } void @@ -435,6 +431,7 @@ BucketDBUpdater::onSetSystemState( _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); + _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); } @@ -782,6 +779,7 @@ BucketDBUpdater::activatePendingClusterState() update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); _pendingClusterState.reset(); _outdatedNodesMap.clear(); + _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); clearReadOnlyBucketRepoDatabases(); diff --git a/storage/src/vespa/storage/distributor/bucketownership.h b/storage/src/vespa/storage/distributor/bucketownership.h index bfe63c9799d..c22f690a830 100644 --- a/storage/src/vespa/storage/distributor/bucketownership.h +++ b/storage/src/vespa/storage/distributor/bucketownership.h @@ -16,9 +16,9 @@ class BucketOwnership _owned(false) { } +public: BucketOwnership() : _checkedState(nullptr), _owned(true) {} -public: bool isOwned() const { return _owned; } /** * Cluster state in which the ownership check failed. Lifetime of returned diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 9c1af99100b..4f3bc7a6caf 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -73,8 +73,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _clusterStateBundle(lib::ClusterState()), _compReg(compReg), _component(compReg, "distributor"), - _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()), - _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()), + _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())), + _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())), _metrics(std::make_shared<DistributorMetricSet>()), _operationOwner(*this, _component.getClock()), _maintenanceOperationOwner(*this, _component.getClock()), diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp b/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp index 7b7970228e7..111e58045a1 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp @@ -1,31 +1,205 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributor_bucket_space.h" +#include "bucketownership.h" #include <vespa/storage/bucketdb/btree_bucket_database.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vespalib/stllike/hash_map.hpp> namespace storage::distributor { +namespace { + +const char *up_states = "uri"; + +} + DistributorBucketSpace::DistributorBucketSpace() + : DistributorBucketSpace(0u) +{ +} + +DistributorBucketSpace::DistributorBucketSpace(uint16_t node_index) : _bucketDatabase(std::make_unique<BTreeBucketDatabase>()), _clusterState(), - _distribution() + _distribution(), + _node_index(node_index), + _distribution_bits(1u), + _pending_cluster_state(), + _available_nodes(), + _ownerships(), + _ideal_nodes() { } DistributorBucketSpace::~DistributorBucketSpace() = default; void +DistributorBucketSpace::clear() +{ + _ownerships.clear(); + _ideal_nodes.clear(); +} + +void +DistributorBucketSpace::enumerate_available_nodes() +{ + _distribution_bits = _clusterState->getDistributionBitCount(); + auto node_count = _clusterState->getNodeCount(lib::NodeType::STORAGE); + if (_pending_cluster_state) { + _distribution_bits = std::min(_distribution_bits, _pending_cluster_state->getDistributionBitCount()); + node_count = std::min(node_count, _pending_cluster_state->getNodeCount(lib::NodeType::STORAGE)); + } + std::vector<bool> nodes(node_count); + for (uint32_t i = 0; i < node_count; ++i) { + lib::Node node_key(lib::NodeType::STORAGE, i); + const lib::NodeState& ns(_clusterState->getNodeState(node_key)); + if (ns.getState().oneOf(up_states)) { + if (!_pending_cluster_state || _pending_cluster_state->getNodeState(node_key).getState().oneOf(up_states)) { + nodes[i] = true; + } + } + } + _available_nodes = std::move(nodes); +} + +void DistributorBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState) { _clusterState = std::move(clusterState); + clear(); + enumerate_available_nodes(); } void DistributorBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution) { _distribution = std::move(distribution); + clear(); +} + +std::vector<uint16_t> +DistributorBucketSpace::get_ideal_nodes_fallback(document::BucketId bucket) const +{ + return _distribution->getIdealStorageNodes(*_clusterState, bucket, up_states); +} + +void +DistributorBucketSpace::set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state) +{ + _pending_cluster_state = std::move(pending_cluster_state); + clear(); + enumerate_available_nodes(); +} + +bool +DistributorBucketSpace::owns_bucket_in_state( + const lib::Distribution& distribution, + const lib::ClusterState& cluster_state, + document::BucketId bucket) const +{ + try { + uint16_t distributor = distribution.getIdealDistributorNode(cluster_state, bucket); + + return (_node_index == distributor); + } catch (lib::TooFewBucketBitsInUseException& e) { + return false; + } catch (lib::NoDistributorsAvailableException& e) { + return false; + } +} + +bool +DistributorBucketSpace::owns_bucket_in_state( + const lib::ClusterState& clusterState, + document::BucketId bucket) const +{ + return owns_bucket_in_state(*_distribution, clusterState, bucket); +} + +bool +DistributorBucketSpace::owns_bucket_in_current_state(document::BucketId bucket) const +{ + return owns_bucket_in_state(*_distribution, *_clusterState, bucket); +} + +BucketOwnership +DistributorBucketSpace::check_ownership_in_pending_state(document::BucketId bucket) const +{ + if (_pending_cluster_state) { + if (!owns_bucket_in_state(*_pending_cluster_state, bucket)) { + return BucketOwnership::createNotOwnedInState(*_pending_cluster_state); + } + } + return BucketOwnership::createOwned(); +} +BucketOwnership +DistributorBucketSpace::check_ownership_in_pending_and_given_state( + const lib::Distribution& distribution, + const lib::ClusterState& clusterState, + document::BucketId bucket) const +{ + try { + BucketOwnership pendingRes( + check_ownership_in_pending_state(bucket)); + if (!pendingRes.isOwned()) { + return pendingRes; + } + uint16_t distributor = distribution.getIdealDistributorNode( + clusterState, bucket); + + if (_node_index == distributor) { + return BucketOwnership::createOwned(); + } else { + return BucketOwnership::createNotOwnedInState(clusterState); + } + } catch (lib::TooFewBucketBitsInUseException& e) { + return BucketOwnership::createNotOwnedInState(clusterState); + } catch (lib::NoDistributorsAvailableException& e) { + return BucketOwnership::createNotOwnedInState(clusterState); + } +} + +BucketOwnership +DistributorBucketSpace::check_ownership_in_pending_and_current_state_fallback(document::BucketId bucket) const +{ + return check_ownership_in_pending_and_given_state(*_distribution, *_clusterState, bucket); +} + +std::vector<uint16_t> +DistributorBucketSpace::get_ideal_nodes(document::BucketId bucket) +{ + assert(bucket.getUsedBits() >= _distribution_bits); + if (bucket.getUsedBits() > 33) { // cf. storage::lib::Distribution::getStorageSeed + // Cannot map to super bucket ==> cannot cache result + return get_ideal_nodes_fallback(bucket); + } + document::BucketId super_bucket(_distribution_bits, bucket.getId()); + auto itr = _ideal_nodes.find(super_bucket); + if (itr != _ideal_nodes.end()) { + return itr->second; + } + auto insres = _ideal_nodes.insert(std::make_pair(super_bucket, get_ideal_nodes_fallback(super_bucket))); + assert(insres.second); + return insres.first->second; +} + +BucketOwnership +DistributorBucketSpace::check_ownership_in_pending_and_current_state(document::BucketId bucket) +{ + if (bucket.getUsedBits() < _distribution_bits) { + // Cannot map to super bucket ==> cannot cache result + return check_ownership_in_pending_and_current_state_fallback(bucket); + } + document::BucketId super_bucket(_distribution_bits, bucket.getId()); + auto itr = _ownerships.find(super_bucket); + if (itr != _ownerships.end()) { + return itr->second; + } + auto insres = _ownerships.insert(std::make_pair(super_bucket, check_ownership_in_pending_and_current_state_fallback(super_bucket))); + assert(insres.second); + return insres.first->second; } } diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h index c137414ecfb..96440fbabad 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h @@ -1,7 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "bucketownership.h" +#include <vespa/document/bucket/bucketid.h> +#include <vespa/vespalib/stllike/hash_map.h> #include <memory> +#include <vector> namespace storage { class BucketDatabase; @@ -29,8 +33,18 @@ class DistributorBucketSpace { std::unique_ptr<BucketDatabase> _bucketDatabase; std::shared_ptr<const lib::ClusterState> _clusterState; std::shared_ptr<const lib::Distribution> _distribution; + uint16_t _node_index; + uint16_t _distribution_bits; + std::shared_ptr<const lib::ClusterState> _pending_cluster_state; + std::vector<bool> _available_nodes; + vespalib::hash_map<document::BucketId, BucketOwnership, document::BucketId::hash> _ownerships; + vespalib::hash_map<document::BucketId, std::vector<uint16_t>, document::BucketId::hash> _ideal_nodes; + + void clear(); + void enumerate_available_nodes(); public: explicit DistributorBucketSpace(); + explicit DistributorBucketSpace(uint16_t node_index); ~DistributorBucketSpace(); DistributorBucketSpace(const DistributorBucketSpace&) = delete; @@ -62,6 +76,22 @@ public: return _distribution; } + void set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state); + + std::vector<uint16_t> get_ideal_nodes_fallback(document::BucketId bucket) const; + + bool owns_bucket_in_state(const lib::Distribution& distribution, const lib::ClusterState& cluster_state, document::BucketId bucket) const; + bool owns_bucket_in_state(const lib::ClusterState& clusterState, document::BucketId bucket) const; + bool owns_bucket_in_current_state(document::BucketId bucket) const; + + BucketOwnership check_ownership_in_pending_state(document::BucketId bucket) const; + BucketOwnership check_ownership_in_pending_and_given_state(const lib::Distribution& distribution, + const lib::ClusterState& clusterState, + document::BucketId bucket) const; + BucketOwnership check_ownership_in_pending_and_current_state_fallback(document::BucketId bucket) const; + const std::vector<bool>& get_available_nodes() const { return _available_nodes; } + std::vector<uint16_t> get_ideal_nodes(document::BucketId bucket); + BucketOwnership check_ownership_in_pending_and_current_state(document::BucketId bucket); }; } diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp index 744c54676ae..4f64dab9a68 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp @@ -2,7 +2,7 @@ #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" -#include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <cassert> @@ -13,11 +13,11 @@ using document::BucketSpace; namespace storage::distributor { -DistributorBucketSpaceRepo::DistributorBucketSpaceRepo() +DistributorBucketSpaceRepo::DistributorBucketSpaceRepo(uint16_t node_index) : _map() { - add(document::FixedBucketSpaces::default_space(), std::make_unique<DistributorBucketSpace>()); - add(document::FixedBucketSpaces::global_space(), std::make_unique<DistributorBucketSpace>()); + add(document::FixedBucketSpaces::default_space(), std::make_unique<DistributorBucketSpace>(node_index)); + add(document::FixedBucketSpaces::global_space(), std::make_unique<DistributorBucketSpace>(node_index)); } DistributorBucketSpaceRepo::~DistributorBucketSpaceRepo() = default; @@ -44,4 +44,20 @@ DistributorBucketSpaceRepo::get(BucketSpace bucketSpace) const return *itr->second; } +void +DistributorBucketSpaceRepo::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& cluster_state_bundle) +{ + for (auto& entry : _map) { + entry.second->set_pending_cluster_state(cluster_state_bundle.getDerivedClusterState(entry.first)); + } +} + +void +DistributorBucketSpaceRepo::clear_pending_cluster_state_bundle() +{ + for (auto& entry : _map) { + entry.second->set_pending_cluster_state({}); + } +} + } diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h index ee36842969a..eb268d0fbd9 100644 --- a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h +++ b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h @@ -5,7 +5,7 @@ #include <memory> #include <unordered_map> -namespace storage::lib { class Distribution; } +namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { @@ -19,7 +19,7 @@ private: BucketSpaceMap _map; public: - DistributorBucketSpaceRepo(); + DistributorBucketSpaceRepo(uint16_t node_index); ~DistributorBucketSpaceRepo(); DistributorBucketSpaceRepo(const DistributorBucketSpaceRepo&&) = delete; @@ -33,6 +33,8 @@ public: BucketSpaceMap::const_iterator begin() const { return _map.begin(); } BucketSpaceMap::const_iterator end() const { return _map.end(); } void add(document::BucketSpace bucketSpace, std::unique_ptr<DistributorBucketSpace> distributorBucketSpace); + void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& cluster_state_bundle); + void clear_pending_cluster_state_bundle(); }; } diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index 519413bab2e..58fe015ed8c 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -49,47 +49,16 @@ DistributorComponent::getClusterStateBundle() const std::vector<uint16_t> DistributorComponent::getIdealNodes(const document::Bucket &bucket) const { - auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); - return bucketSpace.getDistribution().getIdealStorageNodes( - bucketSpace.getClusterState(), - bucket.getBucketId(), - _distributor.getStorageNodeUpStates()); -} - -BucketOwnership -DistributorComponent::checkOwnershipInPendingAndGivenState( - const lib::Distribution& distribution, - const lib::ClusterState& clusterState, - const document::Bucket &bucket) const -{ - try { - BucketOwnership pendingRes( - _distributor.checkOwnershipInPendingState(bucket)); - if (!pendingRes.isOwned()) { - return pendingRes; - } - uint16_t distributor = distribution.getIdealDistributorNode( - clusterState, bucket.getBucketId()); - - if (getIndex() == distributor) { - return BucketOwnership::createOwned(); - } else { - return BucketOwnership::createNotOwnedInState(clusterState); - } - } catch (lib::TooFewBucketBitsInUseException& e) { - return BucketOwnership::createNotOwnedInState(clusterState); - } catch (lib::NoDistributorsAvailableException& e) { - return BucketOwnership::createNotOwnedInState(clusterState); - } + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucket_space.get_ideal_nodes(bucket.getBucketId()); } BucketOwnership DistributorComponent::checkOwnershipInPendingAndCurrentState( const document::Bucket &bucket) const { - auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); - return checkOwnershipInPendingAndGivenState( - bucketSpace.getDistribution(), bucketSpace.getClusterState(), bucket); + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucket_space.check_ownership_in_pending_and_current_state(bucket.getBucketId()); } bool @@ -98,19 +67,8 @@ DistributorComponent::ownsBucketInState( const lib::ClusterState& clusterState, const document::Bucket &bucket) const { - LOG(spam, "checking bucket %s in state %s with distr %s", - bucket.toString().c_str(), clusterState.toString().c_str(), - distribution.getNodeGraph().getDistributionConfigHash().c_str()); - try { - uint16_t distributor = distribution.getIdealDistributorNode( - clusterState, bucket.getBucketId()); - - return (getIndex() == distributor); - } catch (lib::TooFewBucketBitsInUseException& e) { - return false; - } catch (lib::NoDistributorsAvailableException& e) { - return false; - } + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucket_space.owns_bucket_in_state(distribution, clusterState, bucket.getBucketId()); } bool @@ -118,15 +76,15 @@ DistributorComponent::ownsBucketInState( const lib::ClusterState& clusterState, const document::Bucket &bucket) const { - auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); - return ownsBucketInState(bucketSpace.getDistribution(), clusterState, bucket); + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucket_space.owns_bucket_in_state(clusterState, bucket.getBucketId()); } bool DistributorComponent::ownsBucketInCurrentState(const document::Bucket &bucket) const { - auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace())); - return ownsBucketInState(bucketSpace.getDistribution(), bucketSpace.getClusterState(), bucket); + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + return bucket_space.owns_bucket_in_current_state(bucket.getBucketId()); } api::StorageMessageAddress @@ -138,7 +96,8 @@ DistributorComponent::nodeAddress(uint16_t nodeIndex) const bool DistributorComponent::checkDistribution(api::StorageCommand &cmd, const document::Bucket &bucket) { - BucketOwnership bo(checkOwnershipInPendingAndCurrentState(bucket)); + auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace())); + BucketOwnership bo(bucket_space.check_ownership_in_pending_and_current_state(bucket.getBucketId())); if (!bo.isOwned()) { std::string systemStateStr = bo.getNonOwnedState().toString(); LOG(debug, @@ -218,7 +177,7 @@ DistributorComponent::updateBucketDatabase( assert(!(bucket.getBucketId() == document::BucketId())); BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId()); - BucketOwnership ownership(checkOwnershipInPendingAndCurrentState(bucket)); + BucketOwnership ownership(bucketSpace.check_ownership_in_pending_and_current_state(bucket.getBucketId())); if (!ownership.isOwned()) { LOG(debug, "Trying to add %s to database that we do not own according to " @@ -246,26 +205,27 @@ DistributorComponent::updateBucketDatabase( // Ensure that we're not trying to bring any zombie copies into the // bucket database (i.e. copies on nodes that are actually unavailable). - std::vector<uint16_t> unavailableNodes; - enumerateUnavailableNodes(unavailableNodes, bucketSpace.getClusterState(), bucket, changedNodes); - if (auto* pending_state = _distributor.pendingClusterStateOrNull(bucket.getBucketSpace())) { - enumerateUnavailableNodes(unavailableNodes, *pending_state, bucket, changedNodes); + const auto& available_nodes = bucketSpace.get_available_nodes(); + bool found_down_node = false; + for (const auto& copy : changedNodes) { + if (copy.getNode() >= available_nodes.size() || !available_nodes[copy.getNode()]) { + found_down_node = true; + break; + } } // Optimize for common case where we don't have to create a new // bucket copy vector - if (unavailableNodes.empty()) { - dbentry->addNodes(changedNodes, getIdealNodes(bucket)); + if (!found_down_node) { + dbentry->addNodes(changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId())); } else { std::vector<BucketCopy> upNodes; for (uint32_t i = 0; i < changedNodes.size(); ++i) { const BucketCopy& copy(changedNodes[i]); - if (std::find(unavailableNodes.begin(), unavailableNodes.end(), copy.getNode()) - == unavailableNodes.end()) - { + if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) { upNodes.emplace_back(copy); } } - dbentry->addNodes(upNodes, getIdealNodes(bucket)); + dbentry->addNodes(upNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId())); } if (updateFlags & DatabaseUpdate::RESET_TRUSTED) { dbentry->resetTrusted(); diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index 283d0c20390..2817555f4f5 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -41,11 +41,6 @@ public: * distribution and cluster state -and- that of the pending cluster * state and distribution (if any pending exists). */ - BucketOwnership checkOwnershipInPendingAndGivenState( - const lib::Distribution& distribution, - const lib::ClusterState& clusterState, - const document::Bucket &bucket) const; - BucketOwnership checkOwnershipInPendingAndCurrentState( const document::Bucket &bucket) const; |