From a89b2ce68a2ef3ba421517f8e96f9bbb24840958 Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Mon, 20 Sep 2021 14:18:27 +0000 Subject: Use BucketSpaceStateMap to track cluster state and distribution in the top-level distributor. This replaces the previous hack (needed in legacy mode) that used DistributorBucketSpaceRepo to achieve the same. --- .../top_level_bucket_db_updater_test.cpp | 12 +++---- .../top_level_distributor_test_util.cpp | 12 +++---- .../distributor/top_level_distributor_test_util.h | 10 +++--- .../storage/distributor/bucket_space_state_map.cpp | 16 ++++++++++ .../storage/distributor/bucket_space_state_map.h | 3 ++ .../storage/distributor/distributor_component.cpp | 3 +- .../storage/distributor/distributor_component.h | 22 ++++--------- .../distributor/distributor_operation_context.h | 9 ++---- .../storage/distributor/distributor_stripe.cpp | 7 ++-- .../distributor/distributor_stripe_component.cpp | 1 + .../distributor_stripe_operation_context.h | 12 +++++-- .../operations/external/putoperation.cpp | 5 +-- .../pending_bucket_space_db_transition.cpp | 29 +++++------------ .../pending_bucket_space_db_transition.h | 7 ++-- .../storage/distributor/pendingclusterstate.cpp | 37 ++++++++-------------- .../storage/distributor/pendingclusterstate.h | 17 +++++----- .../distributor/top_level_bucket_db_updater.cpp | 34 ++++++++------------ .../distributor/top_level_bucket_db_updater.h | 2 +- .../storage/distributor/top_level_distributor.cpp | 2 +- 19 files changed, 112 insertions(+), 128 deletions(-) diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index d57ce228908..fe8a607c9ae 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -364,7 +364,7 @@ public: OutdatedNodesMap outdated_nodes_map; state = PendingClusterState::createForClusterStateChange( clock, cluster_info, sender, - owner.top_level_bucket_space_repo(), + owner.bucket_space_states(), cmd, outdated_nodes_map, api::Timestamp(1)); } @@ -374,7 +374,7 @@ public: { auto cluster_info = owner.create_cluster_info(old_cluster_state); state = PendingClusterState::createForDistributionChange( - clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1)); + clock, cluster_info, sender, owner.bucket_space_states(), api::Timestamp(1)); } }; @@ -1389,7 +1389,7 @@ TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::stri auto cluster_info = create_cluster_info(old_cluster_state); std::unique_ptr state( PendingClusterState::createForDistributionChange( - clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1))); + clock, cluster_info, sender, bucket_space_states(), api::Timestamp(1))); sort_sent_messages_by_index(sender); @@ -1514,7 +1514,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) { OutdatedNodesMap outdated_nodes_map; std::unique_ptr state( PendingClusterState::createForClusterStateChange( - clock, cluster_info, sender, top_level_bucket_space_repo(), + clock, cluster_info, sender, bucket_space_states(), cmd, outdated_nodes_map, api::Timestamp(1))); ASSERT_EQ(message_count(3), sender.commands().size()); @@ -1670,7 +1670,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists( auto cluster_info = create_cluster_info("cluster:d"); auto state = PendingClusterState::createForClusterStateChange( - clock, cluster_info, sender, top_level_bucket_space_repo(), + clock, cluster_info, sender, bucket_space_states(), cmd, outdated_nodes_map, before_time); parse_input_data(existing_data, before_time, *state, include_bucket_info); @@ -1690,7 +1690,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists( auto cluster_info = create_cluster_info(old_state.toString()); auto state = PendingClusterState::createForClusterStateChange( - clock, cluster_info, sender, top_level_bucket_space_repo(), + clock, cluster_info, sender, bucket_space_states(), cmd, outdated_nodes_map, after_time); parse_input_data(new_data, after_time, *state, include_bucket_info); diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index e4002b3d1cb..6a0aa015ba4 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -265,16 +265,16 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId); } -DistributorBucketSpaceRepo& -TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept +BucketSpaceStateMap& +TopLevelDistributorTestUtil::bucket_space_states() noexcept { - return _distributor->_component.bucket_space_repo(); + return _distributor->_component.bucket_space_states(); } -const DistributorBucketSpaceRepo& -TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept +const BucketSpaceStateMap& +TopLevelDistributorTestUtil::bucket_space_states() const noexcept { - return _distributor->_component.bucket_space_repo(); + return _distributor->_component.bucket_space_states(); } std::unique_ptr diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h index e8794eb4199..6efc36a8215 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.h +++ b/storage/src/tests/distributor/top_level_distributor_test_util.h @@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; } namespace distributor { -class TopLevelDistributor; +class BucketSpaceStateMap; class DistributorBucketSpace; class DistributorBucketSpaceRepo; class DistributorMetricSet; @@ -26,10 +26,11 @@ class DistributorStripe; class DistributorStripeComponent; class DistributorStripeOperationContext; class DistributorStripePool; -class StripeAccessGuard; class IdealStateMetricSet; class Operation; +class StripeAccessGuard; class TopLevelBucketDBUpdater; +class TopLevelDistributor; class TopLevelDistributorTestUtil : private DoneInitializeHandler { @@ -60,9 +61,8 @@ public: // As the above, but always inserts into default bucket space void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr); - // TODO STRIPE replace with BucketSpaceStateMap once legacy is gone - DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept; - const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept; + BucketSpaceStateMap& bucket_space_states() noexcept; + const BucketSpaceStateMap& bucket_space_states() const noexcept; std::unique_ptr acquire_stripe_guard(); diff --git a/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp b/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp index 63c408f7e1e..54c6f887a8b 100644 --- a/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp +++ b/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp @@ -32,6 +32,22 @@ BucketSpaceStateMap::BucketSpaceStateMap() _map.emplace(document::FixedBucketSpaces::global_space(), std::make_unique()); } +const BucketSpaceState& +BucketSpaceStateMap::get(document::BucketSpace space) const +{ + auto itr = _map.find(space); + assert(itr != _map.end()); + return *itr->second; +} + +BucketSpaceState& +BucketSpaceStateMap::get(document::BucketSpace space) +{ + auto itr = _map.find(space); + assert(itr != _map.end()); + return *itr->second; +} + void BucketSpaceStateMap::set_cluster_state(std::shared_ptr cluster_state) { diff --git a/storage/src/vespa/storage/distributor/bucket_space_state_map.h b/storage/src/vespa/storage/distributor/bucket_space_state_map.h index 57eac9eac0d..6209f9f306c 100644 --- a/storage/src/vespa/storage/distributor/bucket_space_state_map.h +++ b/storage/src/vespa/storage/distributor/bucket_space_state_map.h @@ -64,6 +64,9 @@ public: StateMap::const_iterator begin() const { return _map.begin(); } StateMap::const_iterator end() const { return _map.end(); } + const BucketSpaceState& get(document::BucketSpace space) const; + BucketSpaceState& get(document::BucketSpace space); + void set_cluster_state(std::shared_ptr cluster_state); void set_distribution(std::shared_ptr distribution); diff --git a/storage/src/vespa/storage/distributor/distributor_component.cpp b/storage/src/vespa/storage/distributor/distributor_component.cpp index e01d7e7cb6d..a3b2b3c8e99 100644 --- a/storage/src/vespa/storage/distributor/distributor_component.cpp +++ b/storage/src/vespa/storage/distributor/distributor_component.cpp @@ -11,8 +11,7 @@ DistributorComponent::DistributorComponent(DistributorInterface& distributor, const std::string& name) : storage::DistributorComponent(comp_reg, name), _distributor(distributor), - _bucket_space_repo(std::make_unique(node_index(), false)), - _read_only_bucket_space_repo(std::make_unique(node_index(), false)) + _bucket_space_states() { } diff --git a/storage/src/vespa/storage/distributor/distributor_component.h b/storage/src/vespa/storage/distributor/distributor_component.h index 68db5a3c483..2aaa9f421ae 100644 --- a/storage/src/vespa/storage/distributor/distributor_component.h +++ b/storage/src/vespa/storage/distributor/distributor_component.h @@ -2,6 +2,7 @@ #pragma once +#include "bucket_space_state_map.h" #include "distributor_interface.h" #include "distributor_node_context.h" #include "distributor_operation_context.h" @@ -22,9 +23,8 @@ class DistributorComponent : public storage::DistributorComponent, public DistributorOperationContext { private: DistributorInterface& _distributor; - // TODO STRIPE: When legacy mode is removed, replace this with BucketSpaceStateMap. - std::unique_ptr _bucket_space_repo; - std::unique_ptr _read_only_bucket_space_repo; + BucketSpaceStateMap _bucket_space_states; + public: DistributorComponent(DistributorInterface& distributor, @@ -45,23 +45,15 @@ public: api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); } - const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override { - return *_bucket_space_repo; - } - DistributorBucketSpaceRepo& bucket_space_repo() noexcept override { - return *_bucket_space_repo; - } - const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override { - return *_read_only_bucket_space_repo; + const BucketSpaceStateMap& bucket_space_states() const noexcept override { + return _bucket_space_states; } - DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override { - return *_read_only_bucket_space_repo; + BucketSpaceStateMap& bucket_space_states() noexcept override { + return _bucket_space_states; } const storage::DistributorConfiguration& distributor_config() const noexcept override { return _distributor.config(); } - - }; } diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index e0d481a322a..9dd853c7e46 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -6,10 +6,10 @@ #include namespace storage { class DistributorConfiguration; } -namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { +class BucketSpaceStateMap; class DistributorBucketSpaceRepo; /** @@ -19,11 +19,8 @@ class DistributorOperationContext { public: virtual ~DistributorOperationContext() {} virtual api::Timestamp generate_unique_timestamp() = 0; - // TODO STRIPE: Access to bucket space repos is only temporary at this level. - virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0; - virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0; - virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0; - virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0; + virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0; + virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0; virtual const DistributorConfiguration& distributor_config() const noexcept = 0; }; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 6378a8ed3c4..543264d97b9 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -1,14 +1,15 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "blockingoperationstarter.h" -#include "distributor_stripe.h" -#include "distributor_status.h" #include "distributor_bucket_space.h" +#include "distributor_status.h" +#include "distributor_stripe.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" -#include "stripe_host_info_notifier.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" +#include "storage_node_up_states.h" +#include "stripe_host_info_notifier.h" #include "throttlingoperationstarter.h" #include #include diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp index 59029dec66a..cc5a8259c40 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp @@ -4,6 +4,7 @@ #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" #include "pendingmessagetracker.h" +#include "storage_node_up_states.h" #include #include #include diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h index 518c83d7ffa..a5afadad6a7 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h @@ -4,7 +4,6 @@ #include "bucketgctimecalculator.h" #include "bucketownership.h" -#include "distributor_operation_context.h" #include "operation_routing_snapshot.h" #include #include @@ -12,6 +11,7 @@ #include namespace document { class Bucket; } +namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { @@ -20,9 +20,17 @@ class PendingMessageTracker; /** * Interface with functionality that is used when handling distributor stripe operations. */ -class DistributorStripeOperationContext : public DistributorOperationContext { +class DistributorStripeOperationContext { public: virtual ~DistributorStripeOperationContext() = default; + + virtual api::Timestamp generate_unique_timestamp() = 0; + virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0; + virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0; + virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0; + virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0; + virtual const DistributorConfiguration& distributor_config() const noexcept = 0; + virtual void update_bucket_database(const document::Bucket& bucket, const BucketCopy& changed_node, uint32_t update_flags = 0) = 0; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 9d9a04e9dcc..aa02f937b6b 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -4,13 +4,14 @@ #include #include +#include #include #include +#include #include +#include #include #include -#include -#include #include #include diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 335d070ad7b..05045c43888 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "pending_bucket_space_db_transition.h" +#include "bucket_space_state_map.h" #include "clusterinformation.h" +#include "pending_bucket_space_db_transition.h" #include "pendingclusterstate.h" -#include "distributor_bucket_space.h" #include "stripe_access_guard.h" #include #include @@ -19,7 +19,7 @@ using lib::NodeType; using lib::NodeState; PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketSpace bucket_space, - DistributorBucketSpace &distributorBucketSpace, + const BucketSpaceState &bucket_space_state, bool distributionChanged, const OutdatedNodes &outdatedNodes, std::shared_ptr clusterInfo, @@ -31,10 +31,10 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketS _missingEntries(), _clusterInfo(std::move(clusterInfo)), _outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)), - _prevClusterState(distributorBucketSpace.getClusterState()), + _prevClusterState(bucket_space_state.get_cluster_state()), _newClusterState(newClusterState), _creationTimestamp(creationTimestamp), - _distributorBucketSpace(distributorBucketSpace), + _bucket_space_state(bucket_space_state), _distributorIndex(_clusterInfo->getDistributorIndex()), _bucketOwnershipTransfer(distributionChanged), _rejectedRequests() @@ -217,24 +217,11 @@ PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::Trailing inserter.insert_at_end(bucket_id, e); } -// TODO STRIPE remove legacy single stripe stuff -void -PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() -{ - BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); - std::sort(_entries.begin(), _entries.end()); - - const auto& dist = _distributorBucketSpace.getDistribution(); - DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries); - - db.merge(merger); -} - void PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard) { std::sort(_entries.begin(), _entries.end()); - const auto& dist = _distributorBucketSpace.getDistribution(); + const auto& dist = _bucket_space_state.get_distribution(); guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries); } @@ -296,7 +283,7 @@ PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old, bool PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const { - const auto &dist(_distributorBucketSpace.getDistribution()); + const auto &dist(_bucket_space_state.get_distribution()); if (dist.getNodeGraph().getGroupForNode(index) == dist.getNodeGraph().getGroupForNode(_distributorIndex)) { LOG(debug, @@ -317,7 +304,7 @@ PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown( uint16_t nodeIndex, const lib::ClusterState& state) const { - const auto &dist(_distributorBucketSpace.getDistribution()); + const auto &dist(_bucket_space_state.get_distribution()); if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { return false; // Not doing anything for downed groups. } diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index f7766cb265d..37d48323066 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -16,9 +16,9 @@ class State; namespace storage::distributor { +class BucketSpaceState; class ClusterInformation; class PendingClusterState; -class DistributorBucketSpace; class StripeAccessGuard; /** @@ -50,7 +50,7 @@ private: const lib::ClusterState& _prevClusterState; const lib::ClusterState& _newClusterState; const api::Timestamp _creationTimestamp; - DistributorBucketSpace& _distributorBucketSpace; + const BucketSpaceState& _bucket_space_state; uint16_t _distributorIndex; bool _bucketOwnershipTransfer; std::unordered_map _rejectedRequests; @@ -126,7 +126,7 @@ public: }; PendingBucketSpaceDbTransition(document::BucketSpace bucket_space, - DistributorBucketSpace &distributorBucketSpace, + const BucketSpaceState &bucket_space_state, bool distributionChanged, const OutdatedNodes &outdatedNodes, std::shared_ptr clusterInfo, @@ -135,7 +135,6 @@ public: ~PendingBucketSpaceDbTransition(); // Merges all the results with the corresponding bucket database. - void mergeIntoBucketDatabase(); void merge_into_bucket_databases(StripeAccessGuard& guard); // Adds the info from the reply to our list of information. diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 69cf5486a8a..59f5d0a9322 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -1,13 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "pendingclusterstate.h" +#include "bucket_space_state_map.h" #include "pending_bucket_space_db_transition.h" +#include "pendingclusterstate.h" #include "top_level_bucket_db_updater.h" -#include "distributor_bucket_space_repo.h" -#include "distributor_bucket_space.h" -#include -#include #include +#include +#include #include #include #include @@ -27,7 +26,7 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, const std::shared_ptr& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) @@ -41,7 +40,7 @@ PendingClusterState::PendingClusterState( _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), - _bucketSpaceRepo(bucketSpaceRepo), + _bucket_space_states(bucket_space_states), _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()), _isVersionedTransition(true), _bucketOwnershipTransfer(false), @@ -55,7 +54,7 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), @@ -64,7 +63,7 @@ PendingClusterState::PendingClusterState( _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), - _bucketSpaceRepo(bucketSpaceRepo), + _bucket_space_states(bucket_space_states), _clusterStateVersion(0), _isVersionedTransition(false), _bucketOwnershipTransfer(true), @@ -80,7 +79,7 @@ void PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap) { OutdatedNodes emptyOutdatedNodes; - for (auto &elem : _bucketSpaceRepo) { + for (const auto &elem : _bucket_space_states) { auto onItr = outdatedNodesMap.find(elem.first); const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second; auto pendingTransition = @@ -100,8 +99,7 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, void PendingClusterState::logConstructionInformation() const { - const auto &distributorBucketSpace(_bucketSpaceRepo.get(document::FixedBucketSpaces::default_space())); - const auto &distribution(distributorBucketSpace.getDistribution()); + const auto &distribution = _bucket_space_states.get(document::FixedBucketSpaces::default_space()).get_distribution(); LOG(debug, "New PendingClusterState constructed with previous cluster " "state '%s', new cluster state '%s', distribution config " @@ -190,8 +188,7 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() void PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { - const auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpaceAndNode.bucketSpace)); - const auto &distribution(distributorBucketSpace.getDistribution()); + const auto &distribution = _bucket_space_states.get(bucketSpaceAndNode.bucketSpace).get_distribution(); vespalib::string distributionHash; // TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 bool sendLegacyHash = false; @@ -207,10 +204,10 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) if (!sendLegacyHash) { distributionHash = distribution.getNodeGraph().getDistributionConfigHash(); } else { - const auto& defaultSpace = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()); + const auto& defaultSpace = _bucket_space_states.get(document::FixedBucketSpaces::default_space()); // Generate legacy distribution hash explicitly. auto legacyGlobalDistr = GlobalBucketSpaceDistributionConverter::convert_to_global( - defaultSpace.getDistribution(), true/*use legacy mode*/); + defaultSpace.get_distribution(), true/*use legacy mode*/); distributionHash = legacyGlobalDistr->getNodeGraph().getDistributionConfigHash(); LOG(debug, "Falling back to sending legacy hash to node %u: %s", bucketSpaceAndNode.node, distributionHash.c_str()); @@ -322,14 +319,6 @@ PendingClusterState::requestNodesToString() const return ost.str(); } -void -PendingClusterState::mergeIntoBucketDatabases() -{ - for (auto &elem : _pendingTransitions) { - elem.second->mergeIntoBucketDatabase(); - } -} - void PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard) { diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index babcebea69d..fd209197ec6 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -15,9 +15,9 @@ namespace storage::distributor { +class BucketSpaceStateMap; class DistributorMessageSender; class PendingBucketSpaceDbTransition; -class DistributorBucketSpaceRepo; class StripeAccessGuard; /** @@ -45,14 +45,14 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, const std::shared_ptr& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { // Naked new due to private constructor return std::unique_ptr(new PendingClusterState( - clock, clusterInfo, sender, bucketSpaceRepo, + clock, clusterInfo, sender, bucket_space_states, newStateCmd, outdatedNodesMap, creationTimestamp)); } @@ -64,12 +64,12 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, api::Timestamp creationTimestamp) { // Naked new due to private constructor return std::unique_ptr(new PendingClusterState( - clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp)); + clock, clusterInfo, sender, bucket_space_states, creationTimestamp)); } PendingClusterState(const PendingClusterState &) = delete; @@ -146,7 +146,6 @@ public: /** * Merges all the results with the corresponding bucket databases. */ - void mergeIntoBucketDatabases(); void merge_into_bucket_databases(StripeAccessGuard& guard); // Get pending transition for a specific bucket space. Only used by unit test. @@ -169,7 +168,7 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, const std::shared_ptr& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); @@ -182,7 +181,7 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo& bucketSpaceRepo, + const BucketSpaceStateMap& bucket_space_states, api::Timestamp creationTimestamp); struct BucketSpaceAndNode { @@ -229,7 +228,7 @@ private: api::Timestamp _creationTimestamp; DistributorMessageSender& _sender; - DistributorBucketSpaceRepo& _bucketSpaceRepo; + const BucketSpaceStateMap& _bucket_space_states; uint32_t _clusterStateVersion; bool _isVersionedTransition; bool _bucketOwnershipTransfer; diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 20ecf68c3f1..ac97dde6a0c 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -58,10 +58,8 @@ TopLevelBucketDBUpdater::~TopLevelBucketDBUpdater() = default; void TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() { - for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) { - for (auto& iter : *repo) { - iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first)); - } + for (auto& elem : _op_ctx.bucket_space_states()) { + elem.second->set_cluster_state(_active_state_bundle.getDerivedClusterState(elem.first)); } if (_state_activation_listener) { _state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle); @@ -71,23 +69,19 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() { void TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr distribution) { auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); - for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) { - repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); - repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); - } + _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distribution); + _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(global_distr); // TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition? // ... need to take a guard if so, so can probably not be done at ctor time..? } void TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistributionConfigs& configs) { - for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) { - if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) { - repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distr); - } - if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::global_space())) { - repo->get(document::FixedBucketSpaces::global_space()).setDistribution(distr); - } + if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) { + _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distr); + } + if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::global_space())) { + _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(distr); } } @@ -122,10 +116,8 @@ TopLevelBucketDBUpdater::remove_superfluous_buckets( bool is_distribution_config_change) { const char* up_states = storage_node_up_states(); - // TODO STRIPE explicit space -> config mapping, don't get via repo - // ... but we need to get the current cluster state per space..! - for (auto& elem : _op_ctx.bucket_space_repo()) { - const auto& old_cluster_state(elem.second->getClusterState()); + for (auto& elem : _op_ctx.bucket_space_states()) { + const auto& old_cluster_state(elem.second->get_cluster_state()); const auto& new_cluster_state = new_state.getDerivedClusterState(elem.first); // Running a full DB sweep is expensive, so if the cluster state transition does @@ -206,7 +198,7 @@ TopLevelBucketDBUpdater::storage_distribution_changed(const BucketSpaceDistribut _node_ctx.clock(), std::move(clusterInfo), _sender, - _op_ctx.bucket_space_repo(), // TODO STRIPE cannot use! + _op_ctx.bucket_space_states(), _op_ctx.generate_unique_timestamp()); _outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap(); @@ -263,7 +255,7 @@ TopLevelBucketDBUpdater::onSetSystemState( _node_ctx.clock(), std::move(clusterInfo), _sender, - _op_ctx.bucket_space_repo(), // TODO STRIPE remove + _op_ctx.bucket_space_states(), cmd, _outdated_nodes_map, _op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index 2eccb70fdf9..e01ea30cbda 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -2,7 +2,7 @@ #pragma once #include "bucketlistmerger.h" -#include "distributor_stripe_component.h" +#include "distributor_component.h" #include "distributormessagesender.h" #include "messageguard.h" #include "operation_routing_snapshot.h" diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp index 456464576a1..5f8f05c3ee0 100644 --- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp +++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp @@ -573,7 +573,7 @@ TopLevelDistributor::reportStatus(std::ostream& out, } else { auto guard = _stripe_accessor->rendezvous_and_hold_all(); const auto& op_ctx = _component; - for (const auto& space : op_ctx.bucket_space_repo()) { + for (const auto& space : op_ctx.bucket_space_states()) { out << "

" << document::FixedBucketSpaces::to_string(space.first) << " - " << space.first << "

\n"; guard->report_bucket_db_status(space.first, out); } -- cgit v1.2.3