aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-11-24 17:22:46 +0100
committerTor Egge <Tor.Egge@broadpark.no>2020-11-27 13:54:54 +0100
commitb138b00a987afdaf8e4383a4e555b4889d6db81d (patch)
treeae5b0b6f0dcbd338b1a4c1763a5dc2ad3ed8a5aa /storage
parenta0b9c2e42f3d3f2449def853a56a02d74bc92f9f (diff)
Move bucket ownership methods to DistributorBucketSpace.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributor_bucket_space_test.cpp202
-rw-r--r--storage/src/tests/distributor/simplemaintenancescannertest.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/bucketownership.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.cpp176
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space.h30
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp24
-rw-r--r--storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp88
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h5
12 files changed, 465 insertions, 87 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 1e70b6a4881..96df1a842a4 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -10,6 +10,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
bucketdbupdatertest.cpp
bucketgctimecalculatortest.cpp
bucketstateoperationtest.cpp
+ distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
distributor_message_sender_stub.cpp
distributortest.cpp
diff --git a/storage/src/tests/distributor/distributor_bucket_space_test.cpp b/storage/src/tests/distributor/distributor_bucket_space_test.cpp
new file mode 100644
index 00000000000..05abf315607
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_bucket_space_test.cpp
@@ -0,0 +1,202 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributor_bucket_space_repo.h>
+#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using document::BucketId;
+using storage::lib::ClusterState;
+using storage::lib::Distribution;
+
+namespace storage::distributor {
+
+namespace {
+
+std::shared_ptr<ClusterState> stable_state(std::make_shared<ClusterState>("distributor:4 storage:4 bits:8"));
+std::shared_ptr<ClusterState> node_1_down_state(std::make_shared<ClusterState>("distributor:4 .1.s:d storage:4 .1.s:d bits:8"));
+std::shared_ptr<Distribution> distribution_r1(std::make_shared<Distribution>(Distribution::getDefaultDistributionConfig(1, 4)));
+std::shared_ptr<Distribution> distribution_r2(std::make_shared<Distribution>(Distribution::getDefaultDistributionConfig(2, 4)));
+
+}
+
+struct DistributorBucketSpaceTest : public ::testing::Test
+{
+ using CountVector = std::vector<uint32_t>;
+
+ DistributorBucketSpace bucket_space;
+
+ DistributorBucketSpaceTest()
+ : ::testing::Test(),
+ bucket_space(0u)
+ {
+ }
+ ~DistributorBucketSpaceTest() = default;
+ uint32_t count_distributor_buckets();
+ uint32_t count_storage_buckets();
+ uint32_t count_deep_split_distributor_buckets();
+ uint32_t count_deep_split_storage_buckets();
+ CountVector count_buckets();
+ CountVector count_deep_split_buckets();
+};
+
+uint32_t
+DistributorBucketSpaceTest::count_distributor_buckets()
+{
+ uint32_t owned_buckets = 0;
+ uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount();
+ for (uint32_t i = 0; i < (1u << distribution_bits); ++i) {
+ BucketId bucket(distribution_bits, i);
+ bool owned = bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned();
+ bool check_owned = bucket_space.check_ownership_in_pending_and_current_state_fallback(bucket).isOwned();
+ EXPECT_EQ(check_owned, owned);
+ if (owned) {
+ ++owned_buckets;
+ }
+ }
+ return owned_buckets;
+}
+
+uint32_t
+DistributorBucketSpaceTest::count_storage_buckets()
+{
+ uint32_t owned_buckets = 0;
+ uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount();
+ for (uint32_t i = 0; i < (1u << distribution_bits); ++i) {
+ BucketId bucket(distribution_bits, i);
+ auto ideal_nodes = bucket_space.get_ideal_nodes(bucket);
+ auto check_ideal_nodes = bucket_space.get_ideal_nodes_fallback(bucket);
+ EXPECT_EQ(check_ideal_nodes, ideal_nodes);
+ for (auto node : ideal_nodes) {
+ if (node == 0u) {
+ ++owned_buckets;
+ }
+ }
+ }
+ return owned_buckets;
+}
+
+uint32_t
+DistributorBucketSpaceTest::count_deep_split_distributor_buckets()
+{
+ uint32_t owned_buckets = 0;
+ uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount();
+ uint32_t bias = 0;
+ uint32_t bias_max = std::min(1u << distribution_bits, 1000u);
+ for (; bias < bias_max; ++bias) {
+ BucketId bucket(distribution_bits, bias);
+ if (bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned()) {
+ break;
+ }
+ }
+ assert(bias < bias_max);
+ for (uint32_t i = 0; i < 100; ++i) {
+ BucketId bucket(42u, i * (1ul << 32) + bias);
+ bool owned = bucket_space.check_ownership_in_pending_and_current_state(bucket).isOwned();
+ bool check_owned = bucket_space.check_ownership_in_pending_and_current_state_fallback(bucket).isOwned();
+ EXPECT_EQ(check_owned, owned);
+ if (owned) {
+ ++owned_buckets;
+ }
+ }
+ return owned_buckets;
+}
+
+uint32_t
+DistributorBucketSpaceTest::count_deep_split_storage_buckets()
+{
+ uint32_t owned_buckets = 0;
+ uint16_t distribution_bits = bucket_space.getClusterState().getDistributionBitCount();
+ uint32_t bias = 0;
+ uint32_t bias_max = std::min(1u << distribution_bits, 1000u);
+ for (; bias < bias_max; ++bias) {
+ BucketId bucket(distribution_bits, bias);
+ auto ideal_nodes = bucket_space.get_ideal_nodes(bucket);
+ bool found = false;
+ for (auto node : ideal_nodes) {
+ if (node == 0u) {
+ found = true;
+ }
+ }
+ if (found) {
+ break;
+ }
+ }
+ assert(bias < bias_max);
+ for (uint32_t i = 0; i < 100; ++i) {
+ BucketId bucket(42u, i * (1ul << 32) + bias);
+ auto ideal_nodes = bucket_space.get_ideal_nodes(bucket);
+ auto check_ideal_nodes = bucket_space.get_ideal_nodes_fallback(bucket);
+ EXPECT_EQ(check_ideal_nodes, ideal_nodes);
+ for (auto node : ideal_nodes) {
+ if (node == 0u) {
+ ++owned_buckets;
+ }
+ }
+ }
+ return owned_buckets;
+}
+
+DistributorBucketSpaceTest::CountVector
+DistributorBucketSpaceTest::count_buckets()
+{
+ CountVector result;
+ result.push_back(count_distributor_buckets());
+ result.push_back(count_storage_buckets());
+ return result;
+}
+
+DistributorBucketSpaceTest::CountVector
+DistributorBucketSpaceTest::count_deep_split_buckets()
+{
+ CountVector result;
+ result.push_back(count_deep_split_distributor_buckets());
+ result.push_back(count_deep_split_storage_buckets());
+ return result;
+}
+
+TEST_F(DistributorBucketSpaceTest, check_owned_buckets)
+{
+ bucket_space.setDistribution(distribution_r1);
+ bucket_space.setClusterState(stable_state);
+ EXPECT_EQ((CountVector{64u, 64u}), count_buckets());
+ bucket_space.set_pending_cluster_state(node_1_down_state);
+ EXPECT_EQ((CountVector{64u, 64u}), count_buckets());
+ bucket_space.setClusterState(node_1_down_state);
+ bucket_space.set_pending_cluster_state({});
+ EXPECT_EQ((CountVector{86u, 86u}), count_buckets());
+ bucket_space.set_pending_cluster_state(stable_state);
+ EXPECT_EQ((CountVector{64u, 86u}), count_buckets());
+ bucket_space.setClusterState(stable_state);
+ bucket_space.set_pending_cluster_state({});
+ EXPECT_EQ((CountVector{64u, 64u}), count_buckets());
+ bucket_space.setDistribution(distribution_r2);
+ EXPECT_EQ((CountVector{64u, 125u}), count_buckets());
+}
+
+TEST_F(DistributorBucketSpaceTest, check_available_nodes)
+{
+ bucket_space.setDistribution(distribution_r1);
+ bucket_space.setClusterState(stable_state);
+ EXPECT_EQ((std::vector<bool>{true, true, true, true}), bucket_space.get_available_nodes());
+ bucket_space.set_pending_cluster_state(node_1_down_state);
+ EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes());
+ bucket_space.setClusterState(node_1_down_state);
+ bucket_space.set_pending_cluster_state({});
+ EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes());
+ bucket_space.set_pending_cluster_state(stable_state);
+ EXPECT_EQ((std::vector<bool>{true, false, true, true}), bucket_space.get_available_nodes());
+ bucket_space.setClusterState(stable_state);
+ bucket_space.set_pending_cluster_state({});
+ EXPECT_EQ((std::vector<bool>{true, true, true, true}), bucket_space.get_available_nodes());
+}
+
+TEST_F(DistributorBucketSpaceTest, check_owned_deep_split_buckets)
+{
+ bucket_space.setDistribution(distribution_r1);
+ bucket_space.setClusterState(stable_state);
+ EXPECT_EQ((CountVector{100u, 19u}), count_deep_split_buckets());
+}
+
+}
diff --git a/storage/src/tests/distributor/simplemaintenancescannertest.cpp b/storage/src/tests/distributor/simplemaintenancescannertest.cpp
index b21a10c319e..58dc2430041 100644
--- a/storage/src/tests/distributor/simplemaintenancescannertest.cpp
+++ b/storage/src/tests/distributor/simplemaintenancescannertest.cpp
@@ -36,7 +36,7 @@ void
SimpleMaintenanceScannerTest::SetUp()
{
_priorityGenerator = std::make_unique<MockMaintenancePriorityGenerator>();
- _bucketSpaceRepo = std::make_unique<DistributorBucketSpaceRepo>();
+ _bucketSpaceRepo = std::make_unique<DistributorBucketSpaceRepo>(0u);
_priorityDb = std::make_unique<SimpleBucketPriorityDatabase>();
_scanner = std::make_unique<SimpleMaintenanceScanner>(*_priorityDb, *_priorityGenerator, *_bucketSpaceRepo);
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index c5fd7027fa6..388e696928b 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -106,13 +106,8 @@ BucketDBUpdater::hasPendingClusterState() const
BucketOwnership
BucketDBUpdater::checkOwnershipInPendingState(const document::Bucket& b) const
{
- if (hasPendingClusterState()) {
- const auto& state(*_pendingClusterState->getNewClusterStateBundle().getDerivedClusterState(b.getBucketSpace()));
- if (!_distributorComponent.ownsBucketInState(state, b)) {
- return BucketOwnership::createNotOwnedInState(state);
- }
- }
- return BucketOwnership::createOwned();
+ auto &bucket_space(_distributorComponent.getBucketSpaceRepo().get(b.getBucketSpace()));
+ return bucket_space.check_ownership_in_pending_state(b.getBucketId());
}
const lib::ClusterState*
@@ -317,6 +312,7 @@ BucketDBUpdater::storageDistributionChanged()
_distributorComponent.getBucketSpaceRepo(),
_distributorComponent.getUniqueTimestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
+ _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
}
void
@@ -435,6 +431,7 @@ BucketDBUpdater::onSetSystemState(
_distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
+ _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
if (isPendingClusterStateCompleted()) {
processCompletedPendingClusterState();
}
@@ -782,6 +779,7 @@ BucketDBUpdater::activatePendingClusterState()
update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
_pendingClusterState.reset();
_outdatedNodesMap.clear();
+ _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
clearReadOnlyBucketRepoDatabases();
diff --git a/storage/src/vespa/storage/distributor/bucketownership.h b/storage/src/vespa/storage/distributor/bucketownership.h
index bfe63c9799d..c22f690a830 100644
--- a/storage/src/vespa/storage/distributor/bucketownership.h
+++ b/storage/src/vespa/storage/distributor/bucketownership.h
@@ -16,9 +16,9 @@ class BucketOwnership
_owned(false)
{ }
+public:
BucketOwnership() : _checkedState(nullptr), _owned(true) {}
-public:
bool isOwned() const { return _owned; }
/**
* Cluster state in which the ownership check failed. Lifetime of returned
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 9c1af99100b..4f3bc7a6caf 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -73,8 +73,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_clusterStateBundle(lib::ClusterState()),
_compReg(compReg),
_component(compReg, "distributor"),
- _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()),
- _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()),
+ _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())),
+ _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>(_component.getIndex())),
_metrics(std::make_shared<DistributorMetricSet>()),
_operationOwner(*this, _component.getClock()),
_maintenanceOperationOwner(*this, _component.getClock()),
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp b/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp
index 7b7970228e7..111e58045a1 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.cpp
@@ -1,31 +1,205 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "distributor_bucket_space.h"
+#include "bucketownership.h"
#include <vespa/storage/bucketdb/btree_bucket_database.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
namespace storage::distributor {
+namespace {
+
+const char *up_states = "uri";
+
+}
+
DistributorBucketSpace::DistributorBucketSpace()
+ : DistributorBucketSpace(0u)
+{
+}
+
+DistributorBucketSpace::DistributorBucketSpace(uint16_t node_index)
: _bucketDatabase(std::make_unique<BTreeBucketDatabase>()),
_clusterState(),
- _distribution()
+ _distribution(),
+ _node_index(node_index),
+ _distribution_bits(1u),
+ _pending_cluster_state(),
+ _available_nodes(),
+ _ownerships(),
+ _ideal_nodes()
{
}
DistributorBucketSpace::~DistributorBucketSpace() = default;
void
+DistributorBucketSpace::clear()
+{
+ _ownerships.clear();
+ _ideal_nodes.clear();
+}
+
+void
+DistributorBucketSpace::enumerate_available_nodes()
+{
+ _distribution_bits = _clusterState->getDistributionBitCount();
+ auto node_count = _clusterState->getNodeCount(lib::NodeType::STORAGE);
+ if (_pending_cluster_state) {
+ _distribution_bits = std::min(_distribution_bits, _pending_cluster_state->getDistributionBitCount());
+ node_count = std::min(node_count, _pending_cluster_state->getNodeCount(lib::NodeType::STORAGE));
+ }
+ std::vector<bool> nodes(node_count);
+ for (uint32_t i = 0; i < node_count; ++i) {
+ lib::Node node_key(lib::NodeType::STORAGE, i);
+ const lib::NodeState& ns(_clusterState->getNodeState(node_key));
+ if (ns.getState().oneOf(up_states)) {
+ if (!_pending_cluster_state || _pending_cluster_state->getNodeState(node_key).getState().oneOf(up_states)) {
+ nodes[i] = true;
+ }
+ }
+ }
+ _available_nodes = std::move(nodes);
+}
+
+void
DistributorBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState)
{
_clusterState = std::move(clusterState);
+ clear();
+ enumerate_available_nodes();
}
void
DistributorBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution) {
_distribution = std::move(distribution);
+ clear();
+}
+
+std::vector<uint16_t>
+DistributorBucketSpace::get_ideal_nodes_fallback(document::BucketId bucket) const
+{
+ return _distribution->getIdealStorageNodes(*_clusterState, bucket, up_states);
+}
+
+void
+DistributorBucketSpace::set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state)
+{
+ _pending_cluster_state = std::move(pending_cluster_state);
+ clear();
+ enumerate_available_nodes();
+}
+
+bool
+DistributorBucketSpace::owns_bucket_in_state(
+ const lib::Distribution& distribution,
+ const lib::ClusterState& cluster_state,
+ document::BucketId bucket) const
+{
+ try {
+ uint16_t distributor = distribution.getIdealDistributorNode(cluster_state, bucket);
+
+ return (_node_index == distributor);
+ } catch (lib::TooFewBucketBitsInUseException& e) {
+ return false;
+ } catch (lib::NoDistributorsAvailableException& e) {
+ return false;
+ }
+}
+
+bool
+DistributorBucketSpace::owns_bucket_in_state(
+ const lib::ClusterState& clusterState,
+ document::BucketId bucket) const
+{
+ return owns_bucket_in_state(*_distribution, clusterState, bucket);
+}
+
+bool
+DistributorBucketSpace::owns_bucket_in_current_state(document::BucketId bucket) const
+{
+ return owns_bucket_in_state(*_distribution, *_clusterState, bucket);
+}
+
+BucketOwnership
+DistributorBucketSpace::check_ownership_in_pending_state(document::BucketId bucket) const
+{
+ if (_pending_cluster_state) {
+ if (!owns_bucket_in_state(*_pending_cluster_state, bucket)) {
+ return BucketOwnership::createNotOwnedInState(*_pending_cluster_state);
+ }
+ }
+ return BucketOwnership::createOwned();
+}
+BucketOwnership
+DistributorBucketSpace::check_ownership_in_pending_and_given_state(
+ const lib::Distribution& distribution,
+ const lib::ClusterState& clusterState,
+ document::BucketId bucket) const
+{
+ try {
+ BucketOwnership pendingRes(
+ check_ownership_in_pending_state(bucket));
+ if (!pendingRes.isOwned()) {
+ return pendingRes;
+ }
+ uint16_t distributor = distribution.getIdealDistributorNode(
+ clusterState, bucket);
+
+ if (_node_index == distributor) {
+ return BucketOwnership::createOwned();
+ } else {
+ return BucketOwnership::createNotOwnedInState(clusterState);
+ }
+ } catch (lib::TooFewBucketBitsInUseException& e) {
+ return BucketOwnership::createNotOwnedInState(clusterState);
+ } catch (lib::NoDistributorsAvailableException& e) {
+ return BucketOwnership::createNotOwnedInState(clusterState);
+ }
+}
+
+BucketOwnership
+DistributorBucketSpace::check_ownership_in_pending_and_current_state_fallback(document::BucketId bucket) const
+{
+ return check_ownership_in_pending_and_given_state(*_distribution, *_clusterState, bucket);
+}
+
+std::vector<uint16_t>
+DistributorBucketSpace::get_ideal_nodes(document::BucketId bucket)
+{
+ assert(bucket.getUsedBits() >= _distribution_bits);
+ if (bucket.getUsedBits() > 33) { // cf. storage::lib::Distribution::getStorageSeed
+ // Cannot map to super bucket ==> cannot cache result
+ return get_ideal_nodes_fallback(bucket);
+ }
+ document::BucketId super_bucket(_distribution_bits, bucket.getId());
+ auto itr = _ideal_nodes.find(super_bucket);
+ if (itr != _ideal_nodes.end()) {
+ return itr->second;
+ }
+ auto insres = _ideal_nodes.insert(std::make_pair(super_bucket, get_ideal_nodes_fallback(super_bucket)));
+ assert(insres.second);
+ return insres.first->second;
+}
+
+BucketOwnership
+DistributorBucketSpace::check_ownership_in_pending_and_current_state(document::BucketId bucket)
+{
+ if (bucket.getUsedBits() < _distribution_bits) {
+ // Cannot map to super bucket ==> cannot cache result
+ return check_ownership_in_pending_and_current_state_fallback(bucket);
+ }
+ document::BucketId super_bucket(_distribution_bits, bucket.getId());
+ auto itr = _ownerships.find(super_bucket);
+ if (itr != _ownerships.end()) {
+ return itr->second;
+ }
+ auto insres = _ownerships.insert(std::make_pair(super_bucket, check_ownership_in_pending_and_current_state_fallback(super_bucket)));
+ assert(insres.second);
+ return insres.first->second;
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space.h b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
index c137414ecfb..96440fbabad 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space.h
@@ -1,7 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "bucketownership.h"
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/vespalib/stllike/hash_map.h>
#include <memory>
+#include <vector>
namespace storage {
class BucketDatabase;
@@ -29,8 +33,18 @@ class DistributorBucketSpace {
std::unique_ptr<BucketDatabase> _bucketDatabase;
std::shared_ptr<const lib::ClusterState> _clusterState;
std::shared_ptr<const lib::Distribution> _distribution;
+ uint16_t _node_index;
+ uint16_t _distribution_bits;
+ std::shared_ptr<const lib::ClusterState> _pending_cluster_state;
+ std::vector<bool> _available_nodes;
+ vespalib::hash_map<document::BucketId, BucketOwnership, document::BucketId::hash> _ownerships;
+ vespalib::hash_map<document::BucketId, std::vector<uint16_t>, document::BucketId::hash> _ideal_nodes;
+
+ void clear();
+ void enumerate_available_nodes();
public:
explicit DistributorBucketSpace();
+ explicit DistributorBucketSpace(uint16_t node_index);
~DistributorBucketSpace();
DistributorBucketSpace(const DistributorBucketSpace&) = delete;
@@ -62,6 +76,22 @@ public:
return _distribution;
}
+ void set_pending_cluster_state(std::shared_ptr<const lib::ClusterState> pending_cluster_state);
+
+ std::vector<uint16_t> get_ideal_nodes_fallback(document::BucketId bucket) const;
+
+ bool owns_bucket_in_state(const lib::Distribution& distribution, const lib::ClusterState& cluster_state, document::BucketId bucket) const;
+ bool owns_bucket_in_state(const lib::ClusterState& clusterState, document::BucketId bucket) const;
+ bool owns_bucket_in_current_state(document::BucketId bucket) const;
+
+ BucketOwnership check_ownership_in_pending_state(document::BucketId bucket) const;
+ BucketOwnership check_ownership_in_pending_and_given_state(const lib::Distribution& distribution,
+ const lib::ClusterState& clusterState,
+ document::BucketId bucket) const;
+ BucketOwnership check_ownership_in_pending_and_current_state_fallback(document::BucketId bucket) const;
+ const std::vector<bool>& get_available_nodes() const { return _available_nodes; }
+ std::vector<uint16_t> get_ideal_nodes(document::BucketId bucket);
+ BucketOwnership check_ownership_in_pending_and_current_state(document::BucketId bucket);
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
index 744c54676ae..4f64dab9a68 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.cpp
@@ -2,7 +2,7 @@
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
-#include <vespa/vdslib/distribution/distribution.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <cassert>
@@ -13,11 +13,11 @@ using document::BucketSpace;
namespace storage::distributor {
-DistributorBucketSpaceRepo::DistributorBucketSpaceRepo()
+DistributorBucketSpaceRepo::DistributorBucketSpaceRepo(uint16_t node_index)
: _map()
{
- add(document::FixedBucketSpaces::default_space(), std::make_unique<DistributorBucketSpace>());
- add(document::FixedBucketSpaces::global_space(), std::make_unique<DistributorBucketSpace>());
+ add(document::FixedBucketSpaces::default_space(), std::make_unique<DistributorBucketSpace>(node_index));
+ add(document::FixedBucketSpaces::global_space(), std::make_unique<DistributorBucketSpace>(node_index));
}
DistributorBucketSpaceRepo::~DistributorBucketSpaceRepo() = default;
@@ -44,4 +44,20 @@ DistributorBucketSpaceRepo::get(BucketSpace bucketSpace) const
return *itr->second;
}
+void
+DistributorBucketSpaceRepo::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& cluster_state_bundle)
+{
+ for (auto& entry : _map) {
+ entry.second->set_pending_cluster_state(cluster_state_bundle.getDerivedClusterState(entry.first));
+ }
+}
+
+void
+DistributorBucketSpaceRepo::clear_pending_cluster_state_bundle()
+{
+ for (auto& entry : _map) {
+ entry.second->set_pending_cluster_state({});
+ }
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h
index ee36842969a..eb268d0fbd9 100644
--- a/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h
+++ b/storage/src/vespa/storage/distributor/distributor_bucket_space_repo.h
@@ -5,7 +5,7 @@
#include <memory>
#include <unordered_map>
-namespace storage::lib { class Distribution; }
+namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
@@ -19,7 +19,7 @@ private:
BucketSpaceMap _map;
public:
- DistributorBucketSpaceRepo();
+ DistributorBucketSpaceRepo(uint16_t node_index);
~DistributorBucketSpaceRepo();
DistributorBucketSpaceRepo(const DistributorBucketSpaceRepo&&) = delete;
@@ -33,6 +33,8 @@ public:
BucketSpaceMap::const_iterator begin() const { return _map.begin(); }
BucketSpaceMap::const_iterator end() const { return _map.end(); }
void add(document::BucketSpace bucketSpace, std::unique_ptr<DistributorBucketSpace> distributorBucketSpace);
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& cluster_state_bundle);
+ void clear_pending_cluster_state_bundle();
};
}
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 519413bab2e..58fe015ed8c 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -49,47 +49,16 @@ DistributorComponent::getClusterStateBundle() const
std::vector<uint16_t>
DistributorComponent::getIdealNodes(const document::Bucket &bucket) const
{
- auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
- return bucketSpace.getDistribution().getIdealStorageNodes(
- bucketSpace.getClusterState(),
- bucket.getBucketId(),
- _distributor.getStorageNodeUpStates());
-}
-
-BucketOwnership
-DistributorComponent::checkOwnershipInPendingAndGivenState(
- const lib::Distribution& distribution,
- const lib::ClusterState& clusterState,
- const document::Bucket &bucket) const
-{
- try {
- BucketOwnership pendingRes(
- _distributor.checkOwnershipInPendingState(bucket));
- if (!pendingRes.isOwned()) {
- return pendingRes;
- }
- uint16_t distributor = distribution.getIdealDistributorNode(
- clusterState, bucket.getBucketId());
-
- if (getIndex() == distributor) {
- return BucketOwnership::createOwned();
- } else {
- return BucketOwnership::createNotOwnedInState(clusterState);
- }
- } catch (lib::TooFewBucketBitsInUseException& e) {
- return BucketOwnership::createNotOwnedInState(clusterState);
- } catch (lib::NoDistributorsAvailableException& e) {
- return BucketOwnership::createNotOwnedInState(clusterState);
- }
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ return bucket_space.get_ideal_nodes(bucket.getBucketId());
}
BucketOwnership
DistributorComponent::checkOwnershipInPendingAndCurrentState(
const document::Bucket &bucket) const
{
- auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
- return checkOwnershipInPendingAndGivenState(
- bucketSpace.getDistribution(), bucketSpace.getClusterState(), bucket);
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ return bucket_space.check_ownership_in_pending_and_current_state(bucket.getBucketId());
}
bool
@@ -98,19 +67,8 @@ DistributorComponent::ownsBucketInState(
const lib::ClusterState& clusterState,
const document::Bucket &bucket) const
{
- LOG(spam, "checking bucket %s in state %s with distr %s",
- bucket.toString().c_str(), clusterState.toString().c_str(),
- distribution.getNodeGraph().getDistributionConfigHash().c_str());
- try {
- uint16_t distributor = distribution.getIdealDistributorNode(
- clusterState, bucket.getBucketId());
-
- return (getIndex() == distributor);
- } catch (lib::TooFewBucketBitsInUseException& e) {
- return false;
- } catch (lib::NoDistributorsAvailableException& e) {
- return false;
- }
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ return bucket_space.owns_bucket_in_state(distribution, clusterState, bucket.getBucketId());
}
bool
@@ -118,15 +76,15 @@ DistributorComponent::ownsBucketInState(
const lib::ClusterState& clusterState,
const document::Bucket &bucket) const
{
- auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
- return ownsBucketInState(bucketSpace.getDistribution(), clusterState, bucket);
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ return bucket_space.owns_bucket_in_state(clusterState, bucket.getBucketId());
}
bool
DistributorComponent::ownsBucketInCurrentState(const document::Bucket &bucket) const
{
- auto &bucketSpace(_bucketSpaceRepo.get(bucket.getBucketSpace()));
- return ownsBucketInState(bucketSpace.getDistribution(), bucketSpace.getClusterState(), bucket);
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ return bucket_space.owns_bucket_in_current_state(bucket.getBucketId());
}
api::StorageMessageAddress
@@ -138,7 +96,8 @@ DistributorComponent::nodeAddress(uint16_t nodeIndex) const
bool
DistributorComponent::checkDistribution(api::StorageCommand &cmd, const document::Bucket &bucket)
{
- BucketOwnership bo(checkOwnershipInPendingAndCurrentState(bucket));
+ auto &bucket_space(_bucketSpaceRepo.get(bucket.getBucketSpace()));
+ BucketOwnership bo(bucket_space.check_ownership_in_pending_and_current_state(bucket.getBucketId()));
if (!bo.isOwned()) {
std::string systemStateStr = bo.getNonOwnedState().toString();
LOG(debug,
@@ -218,7 +177,7 @@ DistributorComponent::updateBucketDatabase(
assert(!(bucket.getBucketId() == document::BucketId()));
BucketDatabase::Entry dbentry = bucketSpace.getBucketDatabase().get(bucket.getBucketId());
- BucketOwnership ownership(checkOwnershipInPendingAndCurrentState(bucket));
+ BucketOwnership ownership(bucketSpace.check_ownership_in_pending_and_current_state(bucket.getBucketId()));
if (!ownership.isOwned()) {
LOG(debug,
"Trying to add %s to database that we do not own according to "
@@ -246,26 +205,27 @@ DistributorComponent::updateBucketDatabase(
// Ensure that we're not trying to bring any zombie copies into the
// bucket database (i.e. copies on nodes that are actually unavailable).
- std::vector<uint16_t> unavailableNodes;
- enumerateUnavailableNodes(unavailableNodes, bucketSpace.getClusterState(), bucket, changedNodes);
- if (auto* pending_state = _distributor.pendingClusterStateOrNull(bucket.getBucketSpace())) {
- enumerateUnavailableNodes(unavailableNodes, *pending_state, bucket, changedNodes);
+ const auto& available_nodes = bucketSpace.get_available_nodes();
+ bool found_down_node = false;
+ for (const auto& copy : changedNodes) {
+ if (copy.getNode() >= available_nodes.size() || !available_nodes[copy.getNode()]) {
+ found_down_node = true;
+ break;
+ }
}
// Optimize for common case where we don't have to create a new
// bucket copy vector
- if (unavailableNodes.empty()) {
- dbentry->addNodes(changedNodes, getIdealNodes(bucket));
+ if (!found_down_node) {
+ dbentry->addNodes(changedNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()));
} else {
std::vector<BucketCopy> upNodes;
for (uint32_t i = 0; i < changedNodes.size(); ++i) {
const BucketCopy& copy(changedNodes[i]);
- if (std::find(unavailableNodes.begin(), unavailableNodes.end(), copy.getNode())
- == unavailableNodes.end())
- {
+ if (copy.getNode() < available_nodes.size() && available_nodes[copy.getNode()]) {
upNodes.emplace_back(copy);
}
}
- dbentry->addNodes(upNodes, getIdealNodes(bucket));
+ dbentry->addNodes(upNodes, bucketSpace.get_ideal_nodes(bucket.getBucketId()));
}
if (updateFlags & DatabaseUpdate::RESET_TRUSTED) {
dbentry->resetTrusted();
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index 283d0c20390..2817555f4f5 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -41,11 +41,6 @@ public:
* distribution and cluster state -and- that of the pending cluster
* state and distribution (if any pending exists).
*/
- BucketOwnership checkOwnershipInPendingAndGivenState(
- const lib::Distribution& distribution,
- const lib::ClusterState& clusterState,
- const document::Bucket &bucket) const;
-
BucketOwnership checkOwnershipInPendingAndCurrentState(
const document::Bucket &bucket) const;