diff options
Diffstat (limited to 'storage/src/vespa/storage/distributor')
19 files changed, 117 insertions, 91 deletions
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 195410cbe03..212570a3033 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -5,7 +5,6 @@ vespa_add_library(storage_distributor OBJECT blockingoperationstarter.cpp bucket_db_prune_elision.cpp bucket_ownership_calculator.cpp - bucket_space_distribution_configs.cpp bucket_space_distribution_context.cpp bucket_space_state_map.cpp bucket_spaces_stats_provider.cpp diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 35070bcee3b..b823978a0cc 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -108,6 +108,7 @@ buildNodeList(const BucketDatabase::Entry& e,vespalib::ConstArrayRef<uint16_t> n struct ActiveStateOrder { bool operator()(const ActiveCopy & e1, const ActiveCopy & e2) noexcept { + // Replica selection order should be kept in sync with OperationTargetResolverImpl's InstanceOrder. if (e1._ready != e2._ready) { return e1._ready; } @@ -120,7 +121,9 @@ struct ActiveStateOrder { if (e1._active != e2._active) { return e1._active; } - return e1.nodeIndex() < e2.nodeIndex(); + // Use _entry_ order instead of node index, as it is in ideal state order (even for retired + // nodes), which avoids unintentional affinities towards lower node indexes. + return e1.entryIndex() < e2.entryIndex(); } }; diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp deleted file mode 100644 index 37bf8f01752..00000000000 --- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.cpp +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "bucket_space_distribution_configs.h" -#include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> -#include <vespa/vdslib/distribution/distribution.h> - -namespace storage::distributor { - -BucketSpaceDistributionConfigs -BucketSpaceDistributionConfigs::from_default_distribution(std::shared_ptr<const lib::Distribution> distribution) { - BucketSpaceDistributionConfigs ret; - ret.space_configs.emplace(document::FixedBucketSpaces::global_space(), GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution)); - ret.space_configs.emplace(document::FixedBucketSpaces::default_space(), std::move(distribution)); - return ret; -} - -} diff --git a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h b/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h deleted file mode 100644 index cddd21d579f..00000000000 --- a/storage/src/vespa/storage/distributor/bucket_space_distribution_configs.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/document/bucket/bucketspace.h> -#include <map> -#include <memory> - -namespace storage::lib { class Distribution; } - -namespace storage::distributor { - -/** - * Represents a complete mapping of all known bucket spaces to their appropriate, - * (possibly derived) distribution config. - */ -struct BucketSpaceDistributionConfigs { - std::map<document::BucketSpace, std::shared_ptr<const lib::Distribution>> space_configs; - - std::shared_ptr<const lib::Distribution> get_or_nullptr(document::BucketSpace space) const noexcept { - auto iter = space_configs.find(space); - return (iter != space_configs.end()) ? iter->second : std::shared_ptr<const lib::Distribution>(); - } - - static BucketSpaceDistributionConfigs from_default_distribution(std::shared_ptr<const lib::Distribution>); -}; - -} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index c00ab7080da..f8abdf78c2b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -12,7 +12,6 @@ #include "stripe_host_info_notifier.h" #include "throttlingoperationstarter.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> #include <vespa/storage/distributor/maintenance/simplebucketprioritydatabase.h> @@ -21,6 +20,7 @@ #include <vespa/storage/config/distributorconfiguration.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vespalib/util/memoryusage.h> #include <algorithm> @@ -545,7 +545,7 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, const void DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distribution> distribution) { - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); @@ -554,7 +554,7 @@ DistributorStripe::propagateDefaultDistribution(std::shared_ptr<const lib::Distr // Only called when stripe is in rendezvous freeze void -DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { +DistributorStripe::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) { auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space()); auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space()); assert(default_distr && global_distr); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index d782432ab35..f5793e4d39b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -283,7 +283,7 @@ private: void propagate_config_snapshot_to_internal_components(); // Additional implementations of TickableStripe: - void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override; void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; void clear_pending_cluster_state_bundle() override; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index f1cce40ee8b..991a73ec5c6 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -33,7 +33,7 @@ void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared }); } -void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { +void MultiThreadedStripeAccessGuard::update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) { for_each_stripe([&](TickableStripe& stripe) { stripe.update_distribution_config(new_configs); }); diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index a4392416025..3ce22a3e1a7 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -31,7 +31,7 @@ public: void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; - void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) override; void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; void clear_pending_cluster_state_bundle() override; void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 54087850e1b..a92896279b0 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -180,6 +180,8 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen _op_ctx.distributor_config().getMinimalBucketSplit(), _bucket_space.getDistribution().getRedundancy(), _msg->getBucket().getBucketSpace()); + targetResolver.use_symmetric_replica_selection( + _op_ctx.distributor_config().symmetric_put_and_activate_replica_selection()); OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id)); for (const auto& target : targets) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 84e9ab71bcb..849746416d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const bool TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const { - return _updateCmd->getUpdate()->getCreateIfNonExistent(); + return _updateCmd->create_if_missing(); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 7b6833cc299..2b47d53363f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), - _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), + _is_auto_create_update(_msg->create_if_missing()), _node_ctx(node_ctx), _op_ctx(op_ctx), _bucketSpace(bucketSpace), @@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); + if (_msg->has_cached_create_if_missing()) { + command->set_cached_create_if_missing(_msg->create_if_missing()); + } messages.emplace_back(std::move(command), node); } diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp index 394c13c2bad..618cfb56359 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.cpp @@ -10,9 +10,12 @@ namespace storage::distributor { BucketInstance::BucketInstance(const document::BucketId& id, const api::BucketInfo& info, lib::Node node, - uint16_t idealLocationPriority, bool trusted, bool exist) noexcept + uint16_t ideal_location_priority, uint16_t db_entry_order, + bool trusted, bool exist) noexcept : _bucket(id), _info(info), _node(node), - _idealLocationPriority(idealLocationPriority), _trusted(trusted), _exist(exist) + _ideal_location_priority(ideal_location_priority), + _db_entry_order(db_entry_order), + _trusted(trusted), _exists(exist) { } @@ -24,8 +27,8 @@ BucketInstance::print(vespalib::asciistream& out, const PrintProperties&) const std::ostringstream ost; ost << std::hex << _bucket.getId(); - out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _idealLocationPriority - << (_trusted ? ", trusted" : "") << (_exist ? "" : ", new copy") << ")"; + out << "(" << ost.str() << ", " << infoString << ", node " << _node.getIndex() << ", ideal " << _ideal_location_priority + << (_trusted ? ", trusted" : "") << (_exists ? "" : ", new copy") << ")"; } bool @@ -42,7 +45,7 @@ BucketInstanceList::add(const BucketDatabase::Entry& e, const IdealServiceLayerN for (uint32_t i = 0; i < e.getBucketInfo().getNodeCount(); ++i) { const BucketCopy& copy(e.getBucketInfo().getNodeRef(i)); lib::Node node(lib::NodeType::STORAGE, copy.getNode()); - _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), copy.trusted(), true); + _instances.emplace_back(e.getBucketId(), copy.getBucketInfo(), node, idealState.lookup(copy.getNode()), i, copy.trusted(), true); } } @@ -106,7 +109,8 @@ BucketInstanceList::extendToEnoughCopies(const DistributorBucketSpace& distribut for (uint32_t i=0; i<idealNodes.size(); ++i) { lib::Node node(lib::NodeType::STORAGE, idealNodes[i]); if (!contains(node)) { - _instances.emplace_back(newTarget, api::BucketInfo(), node, i, false, false); + // We don't sort `_instances` after extending, so just reuse `i` as dummy DB entry order. + _instances.emplace_back(newTarget, api::BucketInfo(), node, i, i, false, false); } } } @@ -116,7 +120,7 @@ BucketInstanceList::createTargets(document::BucketSpace bucketSpace) { OperationTargetList result; for (const auto& bi : _instances) { - result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exist); + result.emplace_back(document::Bucket(bucketSpace, bi._bucket), bi._node, !bi._exists); } return result; } @@ -129,6 +133,49 @@ BucketInstanceList::print(vespalib::asciistream& out, const PrintProperties& p) namespace { /** + * To maintain a symmetry between which replicas receive Puts and which versions are + * preferred for activation, use an identical ordering predicate for both (for the case + * where replicas are for the same concrete bucket). + * + * Must only be used with BucketInstances that have a distinct _db_entry_order set per instance. + */ +struct ActiveReplicaSymmetricInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { + if (a._bucket == b._bucket) { + if (a._info.isReady() != b._info.isReady()) { + return a._info.isReady(); + } + if (a._info.getDocumentCount() != b._info.getDocumentCount()) { + return a._info.getDocumentCount() > b._info.getDocumentCount(); + } + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; + } + if (a._info.isActive() != b._info.isActive()) { + return a._info.isActive(); + } + // If all else is equal, this implies both A and B are on retired nodes, which is unlikely + // but possible. Fall back to the existing DB _entry order_, which is equal to an ideal + // state order where retired nodes are considered part of the ideal state (which is not the + // case for most ideal state operations). Since the DB entry order is in ideal state order, + // using this instead of node _index_ avoids affinities to lower indexes in such edge cases. + return a._db_entry_order < b._db_entry_order; + } else { + // TODO this inconsistent split case is equal to the legacy logic (aside from the tie-breaking), + // but is considered to be extremely unlikely in practice, so not worth optimizing for. + if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { + return (a._info.getMetaCount() == 0); + } + if (a._bucket.getUsedBits() != b._bucket.getUsedBits()) { + return (a._bucket.getUsedBits() > b._bucket.getUsedBits()); + } + return a._db_entry_order < b._db_entry_order; + } + return false; + } +}; + +/** * - Trusted copies should be preferred over non-trusted copies for the same bucket. * - Buckets in ideal locations should be preferred over non-ideal locations for the * same bucket across several nodes. @@ -137,14 +184,14 @@ namespace { * - Right after split/join, bucket is often not in ideal location, but should be * preferred instead of source anyhow. */ -struct InstanceOrder { - bool operator()(const BucketInstance& a, const BucketInstance& b) { +struct LegacyInstanceOrder { + bool operator()(const BucketInstance& a, const BucketInstance& b) noexcept { if (a._bucket == b._bucket) { - // Trusted only makes sense within same bucket - // Prefer trusted buckets over non-trusted ones. + // Trusted only makes sense within same bucket + // Prefer trusted buckets over non-trusted ones. if (a._trusted != b._trusted) return a._trusted; - if (a._idealLocationPriority != b._idealLocationPriority) { - return a._idealLocationPriority < b._idealLocationPriority; + if (a._ideal_location_priority != b._ideal_location_priority) { + return a._ideal_location_priority < b._ideal_location_priority; } } else { if ((a._info.getMetaCount() == 0) ^ (b._info.getMetaCount() == 0)) { @@ -164,7 +211,11 @@ OperationTargetResolverImpl::getAllInstances(OperationType type, const document: BucketInstanceList instances; if (type == PUT) { instances.populate(id, _distributor_bucket_space, _bucketDatabase); - instances.sort(InstanceOrder()); + if (_symmetric_replica_selection) { + instances.sort(ActiveReplicaSymmetricInstanceOrder()); + } else { + instances.sort(LegacyInstanceOrder()); + } instances.removeNodeDuplicates(); instances.extendToEnoughCopies(_distributor_bucket_space, _bucketDatabase, _bucketDatabase.getAppropriateBucket(_minUsedBucketBits, id), id); diff --git a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h index 9f367a89cba..6ab38928200 100644 --- a/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h +++ b/storage/src/vespa/storage/distributor/operationtargetresolverimpl.h @@ -15,15 +15,17 @@ struct BucketInstance : public vespalib::AsciiPrintable { document::BucketId _bucket; api::BucketInfo _info; lib::Node _node; - uint16_t _idealLocationPriority; - bool _trusted; - bool _exist; + uint16_t _ideal_location_priority; + uint16_t _db_entry_order; + bool _trusted; // TODO remove + bool _exists; BucketInstance() noexcept - : _idealLocationPriority(0xffff), _trusted(false), _exist(false) {} + : _ideal_location_priority(0xffff), _db_entry_order(0xffff), _trusted(false), _exists(false) + {} BucketInstance(const document::BucketId& id, const api::BucketInfo& info, - lib::Node node, uint16_t idealLocationPriority, bool trusted, - bool exist) noexcept; + lib::Node node, uint16_t ideal_location_priority, + uint16_t db_entry_order, bool trusted, bool exist) noexcept; void print(vespalib::asciistream& out, const PrintProperties&) const override; }; @@ -83,6 +85,7 @@ class OperationTargetResolverImpl : public OperationTargetResolver { uint32_t _minUsedBucketBits; uint16_t _redundancy; document::BucketSpace _bucketSpace; + bool _symmetric_replica_selection; public: OperationTargetResolverImpl(const DistributorBucketSpace& distributor_bucket_space, @@ -94,9 +97,14 @@ public: _bucketDatabase(bucketDatabase), _minUsedBucketBits(minUsedBucketBits), _redundancy(redundancy), - _bucketSpace(bucketSpace) + _bucketSpace(bucketSpace), + _symmetric_replica_selection(true) {} + void use_symmetric_replica_selection(bool symmetry) noexcept { + _symmetric_replica_selection = symmetry; + } + BucketInstanceList getAllInstances(OperationType type, const document::BucketId& id); BucketInstanceList getInstances(OperationType type, const document::BucketId& id) { BucketInstanceList result(getAllInstances(type, id)); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index b756c2e421b..f2b5fa62d1e 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -5,7 +5,6 @@ #include "pendingclusterstate.h" #include "top_level_bucket_db_updater.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlstream.hpp> diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index 1618bc9be9d..8a930ed3305 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -1,11 +1,11 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "bucket_space_distribution_configs.h" #include "pending_bucket_space_db_transition_entry.h" #include "potential_data_loss_report.h" #include "outdated_nodes.h" #include <vespa/document/bucket/bucketspace.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/storageapi/defs.h> #include <unordered_set> // TODO use hash_set instead @@ -38,7 +38,7 @@ public: virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; - virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0; virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; virtual void clear_pending_cluster_state_bundle() = 0; virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h index ab1cd570089..2605c24639e 100644 --- a/storage/src/vespa/storage/distributor/tickable_stripe.h +++ b/storage/src/vespa/storage/distributor/tickable_stripe.h @@ -39,7 +39,7 @@ public: virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; - virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void update_distribution_config(const lib::BucketSpaceDistributionConfigs& new_configs) = 0; virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; virtual void clear_pending_cluster_state_bundle() = 0; virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 1f7d11362e2..1991c9aaf58 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -2,7 +2,6 @@ #include "top_level_bucket_db_updater.h" #include "bucket_db_prune_elision.h" -#include "bucket_space_distribution_configs.h" #include "bucket_space_distribution_context.h" #include "top_level_distributor.h" #include "distributor_bucket_space.h" @@ -11,11 +10,12 @@ #include "simpleclusterinformation.h" #include "stripe_access_guard.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/config/distributorconfiguration.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/removelocation.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/xmlstream.h> #include <thread> @@ -54,6 +54,7 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n { // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle! propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer. + // TODO bootstrap cluster state bundle instead? version:0 cluster:d bootstrap_distribution_config(std::move(bootstrap_distribution)); } @@ -71,7 +72,7 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally(bool has_bucke void TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) { - auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); + auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distribution); _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(global_distr); // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition? @@ -79,7 +80,7 @@ TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib } void -TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistributionConfigs& configs) { +TopLevelBucketDBUpdater::propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs) { if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) { _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distr); } @@ -183,13 +184,14 @@ TopLevelBucketDBUpdater::complete_transition_timer() } void -TopLevelBucketDBUpdater::storage_distribution_changed(const BucketSpaceDistributionConfigs& configs) +TopLevelBucketDBUpdater::storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs) { propagate_distribution_config(configs); ensure_transition_timer_started(); auto guard = _stripe_accessor.rendezvous_and_hold_all(); // FIXME STRIPE might this cause a mismatch with the component stuff's own distribution config..?! + // TODO should be part of bundle only...!! guard->update_distribution_config(configs); remove_superfluous_buckets(*guard, _active_state_bundle, true); diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index e76456329d4..87a408281a7 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -23,9 +23,12 @@ class XmlOutputStream; class XmlAttribute; } +namespace storage::lib { +struct BucketSpaceDistributionConfigs; +} + namespace storage::distributor { -struct BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; class ClusterStateBundleActivationListener; class DistributorInterface; @@ -57,9 +60,9 @@ public: bool reportStatus(std::ostream&, const framework::HttpUrlPath&) const override; void resend_delayed_messages(); - void storage_distribution_changed(const BucketSpaceDistributionConfigs& configs); + void storage_distribution_changed(const lib::BucketSpaceDistributionConfigs& configs); void bootstrap_distribution_config(std::shared_ptr<const lib::Distribution>); - void propagate_distribution_config(const BucketSpaceDistributionConfigs& configs); + void propagate_distribution_config(const lib::BucketSpaceDistributionConfigs& configs); vespalib::string report_xml_status(vespalib::xml::XmlOutputStream& xos, const framework::HttpUrlPath&) const; diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index 7348dbd6409..f7ee89ae7c0 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -1,7 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. // #include "blockingoperationstarter.h" -#include "bucket_space_distribution_configs.h" #include "top_level_bucket_db_updater.h" #include "top_level_distributor.h" #include "distributor_bucket_space.h" @@ -16,7 +15,6 @@ #include "throttlingoperationstarter.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/storage/common/bucket_stripe_utils.h> -#include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/common/hostreporter/hostinfo.h> #include <vespa/storage/common/node_identity.h> #include <vespa/storage/common/nodestateupdater.h> @@ -25,7 +23,9 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/visitor.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> +#include <vespa/vdslib/distribution/bucket_space_distribution_configs.h> #include <vespa/vdslib/distribution/distribution.h> +#include <vespa/vdslib/distribution/global_bucket_space_distribution_converter.h> #include <vespa/vespalib/util/memoryusage.h> #include <algorithm> @@ -323,7 +323,7 @@ TopLevelDistributor::enable_next_distribution_if_changed() if (_next_distribution) { _distribution = _next_distribution; _next_distribution = std::shared_ptr<lib::Distribution>(); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); + auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(_distribution); _bucket_db_updater->storage_distribution_changed(new_configs); // Transitively updates all stripes' configs } } @@ -334,7 +334,7 @@ TopLevelDistributor::propagate_default_distribution_thread_unsafe( { // Should only be called at ctor time, at which point the pool is not yet running. assert(_stripe_pool.stripe_count() == 0); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + auto new_configs = lib::BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); for (auto& stripe : _stripes) { stripe->update_distribution_config(new_configs); } |