aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp12
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.cpp12
-rw-r--r--storage/src/tests/distributor/top_level_distributor_test_util.h10
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_state_map.cpp16
-rw-r--r--storage/src/vespa/storage/distributor/bucket_space_state_map.h3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_component.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/distributor_component.h22
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h9
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_component.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h12
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h7
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.cpp37
-rw-r--r--storage/src/vespa/storage/distributor/pendingclusterstate.h17
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp34
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/top_level_distributor.cpp2
19 files changed, 112 insertions, 128 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index d57ce228908..fe8a607c9ae 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -364,7 +364,7 @@ public:
OutdatedNodesMap outdated_nodes_map;
state = PendingClusterState::createForClusterStateChange(
clock, cluster_info, sender,
- owner.top_level_bucket_space_repo(),
+ owner.bucket_space_states(),
cmd, outdated_nodes_map, api::Timestamp(1));
}
@@ -374,7 +374,7 @@ public:
{
auto cluster_info = owner.create_cluster_info(old_cluster_state);
state = PendingClusterState::createForDistributionChange(
- clock, cluster_info, sender, owner.top_level_bucket_space_repo(), api::Timestamp(1));
+ clock, cluster_info, sender, owner.bucket_space_states(), api::Timestamp(1));
}
};
@@ -1389,7 +1389,7 @@ TopLevelBucketDBUpdaterTest::get_sent_nodes_distribution_changed(const std::stri
auto cluster_info = create_cluster_info(old_cluster_state);
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, cluster_info, sender, top_level_bucket_space_repo(), api::Timestamp(1)));
+ clock, cluster_info, sender, bucket_space_states(), api::Timestamp(1)));
sort_sent_messages_by_index(sender);
@@ -1514,7 +1514,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_receive) {
OutdatedNodesMap outdated_nodes_map;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, cluster_info, sender, top_level_bucket_space_repo(),
+ clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, api::Timestamp(1)));
ASSERT_EQ(message_count(3), sender.commands().size());
@@ -1670,7 +1670,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists(
auto cluster_info = create_cluster_info("cluster:d");
auto state = PendingClusterState::createForClusterStateChange(
- clock, cluster_info, sender, top_level_bucket_space_repo(),
+ clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, before_time);
parse_input_data(existing_data, before_time, *state, include_bucket_info);
@@ -1690,7 +1690,7 @@ TopLevelBucketDBUpdaterTest::merge_bucket_lists(
auto cluster_info = create_cluster_info(old_state.toString());
auto state = PendingClusterState::createForClusterStateChange(
- clock, cluster_info, sender, top_level_bucket_space_repo(),
+ clock, cluster_info, sender, bucket_space_states(),
cmd, outdated_nodes_map, after_time);
parse_input_data(new_data, after_time, *state, include_bucket_info);
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
index e4002b3d1cb..6a0aa015ba4 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp
@@ -265,16 +265,16 @@ TopLevelDistributorTestUtil::get_bucket(const document::BucketId& bId) const
return stripe_bucket_database(stripe_index_of_bucket(bId)).get(bId);
}
-DistributorBucketSpaceRepo&
-TopLevelDistributorTestUtil::top_level_bucket_space_repo() noexcept
+BucketSpaceStateMap&
+TopLevelDistributorTestUtil::bucket_space_states() noexcept
{
- return _distributor->_component.bucket_space_repo();
+ return _distributor->_component.bucket_space_states();
}
-const DistributorBucketSpaceRepo&
-TopLevelDistributorTestUtil::top_level_bucket_space_repo() const noexcept
+const BucketSpaceStateMap&
+TopLevelDistributorTestUtil::bucket_space_states() const noexcept
{
- return _distributor->_component.bucket_space_repo();
+ return _distributor->_component.bucket_space_states();
}
std::unique_ptr<StripeAccessGuard>
diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.h b/storage/src/tests/distributor/top_level_distributor_test_util.h
index e8794eb4199..6efc36a8215 100644
--- a/storage/src/tests/distributor/top_level_distributor_test_util.h
+++ b/storage/src/tests/distributor/top_level_distributor_test_util.h
@@ -17,7 +17,7 @@ namespace framework { struct TickingThreadPool; }
namespace distributor {
-class TopLevelDistributor;
+class BucketSpaceStateMap;
class DistributorBucketSpace;
class DistributorBucketSpaceRepo;
class DistributorMetricSet;
@@ -26,10 +26,11 @@ class DistributorStripe;
class DistributorStripeComponent;
class DistributorStripeOperationContext;
class DistributorStripePool;
-class StripeAccessGuard;
class IdealStateMetricSet;
class Operation;
+class StripeAccessGuard;
class TopLevelBucketDBUpdater;
+class TopLevelDistributor;
class TopLevelDistributorTestUtil : private DoneInitializeHandler
{
@@ -60,9 +61,8 @@ public:
// As the above, but always inserts into default bucket space
void add_nodes_to_stripe_bucket_db(const document::BucketId& id, const std::string& nodeStr);
- // TODO STRIPE replace with BucketSpaceStateMap once legacy is gone
- DistributorBucketSpaceRepo& top_level_bucket_space_repo() noexcept;
- const DistributorBucketSpaceRepo& top_level_bucket_space_repo() const noexcept;
+ BucketSpaceStateMap& bucket_space_states() noexcept;
+ const BucketSpaceStateMap& bucket_space_states() const noexcept;
std::unique_ptr<StripeAccessGuard> acquire_stripe_guard();
diff --git a/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp b/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp
index 63c408f7e1e..54c6f887a8b 100644
--- a/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp
+++ b/storage/src/vespa/storage/distributor/bucket_space_state_map.cpp
@@ -32,6 +32,22 @@ BucketSpaceStateMap::BucketSpaceStateMap()
_map.emplace(document::FixedBucketSpaces::global_space(), std::make_unique<BucketSpaceState>());
}
+const BucketSpaceState&
+BucketSpaceStateMap::get(document::BucketSpace space) const
+{
+ auto itr = _map.find(space);
+ assert(itr != _map.end());
+ return *itr->second;
+}
+
+BucketSpaceState&
+BucketSpaceStateMap::get(document::BucketSpace space)
+{
+ auto itr = _map.find(space);
+ assert(itr != _map.end());
+ return *itr->second;
+}
+
void
BucketSpaceStateMap::set_cluster_state(std::shared_ptr<const lib::ClusterState> cluster_state)
{
diff --git a/storage/src/vespa/storage/distributor/bucket_space_state_map.h b/storage/src/vespa/storage/distributor/bucket_space_state_map.h
index 57eac9eac0d..6209f9f306c 100644
--- a/storage/src/vespa/storage/distributor/bucket_space_state_map.h
+++ b/storage/src/vespa/storage/distributor/bucket_space_state_map.h
@@ -64,6 +64,9 @@ public:
StateMap::const_iterator begin() const { return _map.begin(); }
StateMap::const_iterator end() const { return _map.end(); }
+ const BucketSpaceState& get(document::BucketSpace space) const;
+ BucketSpaceState& get(document::BucketSpace space);
+
void set_cluster_state(std::shared_ptr<const lib::ClusterState> cluster_state);
void set_distribution(std::shared_ptr<const lib::Distribution> distribution);
diff --git a/storage/src/vespa/storage/distributor/distributor_component.cpp b/storage/src/vespa/storage/distributor/distributor_component.cpp
index e01d7e7cb6d..a3b2b3c8e99 100644
--- a/storage/src/vespa/storage/distributor/distributor_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_component.cpp
@@ -11,8 +11,7 @@ DistributorComponent::DistributorComponent(DistributorInterface& distributor,
const std::string& name)
: storage::DistributorComponent(comp_reg, name),
_distributor(distributor),
- _bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index(), false)),
- _read_only_bucket_space_repo(std::make_unique<DistributorBucketSpaceRepo>(node_index(), false))
+ _bucket_space_states()
{
}
diff --git a/storage/src/vespa/storage/distributor/distributor_component.h b/storage/src/vespa/storage/distributor/distributor_component.h
index 68db5a3c483..2aaa9f421ae 100644
--- a/storage/src/vespa/storage/distributor/distributor_component.h
+++ b/storage/src/vespa/storage/distributor/distributor_component.h
@@ -2,6 +2,7 @@
#pragma once
+#include "bucket_space_state_map.h"
#include "distributor_interface.h"
#include "distributor_node_context.h"
#include "distributor_operation_context.h"
@@ -22,9 +23,8 @@ class DistributorComponent : public storage::DistributorComponent,
public DistributorOperationContext {
private:
DistributorInterface& _distributor;
- // TODO STRIPE: When legacy mode is removed, replace this with BucketSpaceStateMap.
- std::unique_ptr<DistributorBucketSpaceRepo> _bucket_space_repo;
- std::unique_ptr<DistributorBucketSpaceRepo> _read_only_bucket_space_repo;
+ BucketSpaceStateMap _bucket_space_states;
+
public:
DistributorComponent(DistributorInterface& distributor,
@@ -45,23 +45,15 @@ public:
api::Timestamp generate_unique_timestamp() override {
return getUniqueTimestamp();
}
- const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept override {
- return *_bucket_space_repo;
- }
- DistributorBucketSpaceRepo& bucket_space_repo() noexcept override {
- return *_bucket_space_repo;
- }
- const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept override {
- return *_read_only_bucket_space_repo;
+ const BucketSpaceStateMap& bucket_space_states() const noexcept override {
+ return _bucket_space_states;
}
- DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept override {
- return *_read_only_bucket_space_repo;
+ BucketSpaceStateMap& bucket_space_states() noexcept override {
+ return _bucket_space_states;
}
const storage::DistributorConfiguration& distributor_config() const noexcept override {
return _distributor.config();
}
-
-
};
}
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index e0d481a322a..9dd853c7e46 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -6,10 +6,10 @@
#include <vespa/storageapi/defs.h>
namespace storage { class DistributorConfiguration; }
-namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
+class BucketSpaceStateMap;
class DistributorBucketSpaceRepo;
/**
@@ -19,11 +19,8 @@ class DistributorOperationContext {
public:
virtual ~DistributorOperationContext() {}
virtual api::Timestamp generate_unique_timestamp() = 0;
- // TODO STRIPE: Access to bucket space repos is only temporary at this level.
- virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0;
- virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
- virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
- virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
+ virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0;
+ virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0;
virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
};
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 6378a8ed3c4..543264d97b9 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -1,14 +1,15 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "blockingoperationstarter.h"
-#include "distributor_stripe.h"
-#include "distributor_status.h"
#include "distributor_bucket_space.h"
+#include "distributor_status.h"
+#include "distributor_stripe.h"
#include "distributormetricsset.h"
#include "idealstatemetricsset.h"
-#include "stripe_host_info_notifier.h"
#include "operation_sequencer.h"
#include "ownership_transfer_safe_time_point_calculator.h"
+#include "storage_node_up_states.h"
+#include "stripe_host_info_notifier.h"
#include "throttlingoperationstarter.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
index 59029dec66a..cc5a8259c40 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp
@@ -4,6 +4,7 @@
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
#include "pendingmessagetracker.h"
+#include "storage_node_up_states.h"
#include <vespa/document/select/parser.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vdslib/state/clusterstate.h>
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
index 518c83d7ffa..a5afadad6a7 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h
@@ -4,7 +4,6 @@
#include "bucketgctimecalculator.h"
#include "bucketownership.h"
-#include "distributor_operation_context.h"
#include "operation_routing_snapshot.h"
#include <vespa/document/bucket/bucketspace.h>
#include <vespa/storage/bucketdb/bucketdatabase.h>
@@ -12,6 +11,7 @@
#include <vespa/storageapi/defs.h>
namespace document { class Bucket; }
+namespace storage::lib { class ClusterStateBundle; }
namespace storage::distributor {
@@ -20,9 +20,17 @@ class PendingMessageTracker;
/**
* Interface with functionality that is used when handling distributor stripe operations.
*/
-class DistributorStripeOperationContext : public DistributorOperationContext {
+class DistributorStripeOperationContext {
public:
virtual ~DistributorStripeOperationContext() = default;
+
+ virtual api::Timestamp generate_unique_timestamp() = 0;
+ virtual const DistributorBucketSpaceRepo& bucket_space_repo() const noexcept = 0;
+ virtual DistributorBucketSpaceRepo& bucket_space_repo() noexcept = 0;
+ virtual const DistributorBucketSpaceRepo& read_only_bucket_space_repo() const noexcept = 0;
+ virtual DistributorBucketSpaceRepo& read_only_bucket_space_repo() noexcept = 0;
+ virtual const DistributorConfiguration& distributor_config() const noexcept = 0;
+
virtual void update_bucket_database(const document::Bucket& bucket,
const BucketCopy& changed_node,
uint32_t update_flags = 0) = 0;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 9d9a04e9dcc..aa02f937b6b 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -4,13 +4,14 @@
#include <vespa/document/fieldvalue/document.h>
#include <vespa/storage/distributor/activecopy.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/operationtargetresolverimpl.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
+#include <vespa/storage/distributor/storage_node_up_states.h>
#include <vespa/storageapi/message/persistence.h>
+#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/distribution/idealnodecalculatorimpl.h>
#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/storage/distributor/distributor_bucket_space.h>
-#include <vespa/vdslib/distribution/distribution.h>
#include <algorithm>
#include <vespa/log/log.h>
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
index 335d070ad7b..05045c43888 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp
@@ -1,9 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "pending_bucket_space_db_transition.h"
+#include "bucket_space_state_map.h"
#include "clusterinformation.h"
+#include "pending_bucket_space_db_transition.h"
#include "pendingclusterstate.h"
-#include "distributor_bucket_space.h"
#include "stripe_access_guard.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
@@ -19,7 +19,7 @@ using lib::NodeType;
using lib::NodeState;
PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketSpace bucket_space,
- DistributorBucketSpace &distributorBucketSpace,
+ const BucketSpaceState &bucket_space_state,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
@@ -31,10 +31,10 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(document::BucketS
_missingEntries(),
_clusterInfo(std::move(clusterInfo)),
_outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)),
- _prevClusterState(distributorBucketSpace.getClusterState()),
+ _prevClusterState(bucket_space_state.get_cluster_state()),
_newClusterState(newClusterState),
_creationTimestamp(creationTimestamp),
- _distributorBucketSpace(distributorBucketSpace),
+ _bucket_space_state(bucket_space_state),
_distributorIndex(_clusterInfo->getDistributorIndex()),
_bucketOwnershipTransfer(distributionChanged),
_rejectedRequests()
@@ -217,24 +217,11 @@ PendingBucketSpaceDbTransition::DbMerger::addToInserter(BucketDatabase::Trailing
inserter.insert_at_end(bucket_id, e);
}
-// TODO STRIPE remove legacy single stripe stuff
-void
-PendingBucketSpaceDbTransition::mergeIntoBucketDatabase()
-{
- BucketDatabase &db(_distributorBucketSpace.getBucketDatabase());
- std::sort(_entries.begin(), _entries.end());
-
- const auto& dist = _distributorBucketSpace.getDistribution();
- DbMerger merger(_creationTimestamp, dist, _newClusterState, _clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
-
- db.merge(merger);
-}
-
void
PendingBucketSpaceDbTransition::merge_into_bucket_databases(StripeAccessGuard& guard)
{
std::sort(_entries.begin(), _entries.end());
- const auto& dist = _distributorBucketSpace.getDistribution();
+ const auto& dist = _bucket_space_state.get_distribution();
guard.merge_entries_into_db(_bucket_space, _creationTimestamp, dist, _newClusterState,
_clusterInfo->getStorageUpStates(), _outdatedNodes, _entries);
}
@@ -296,7 +283,7 @@ PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old,
bool
PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const
{
- const auto &dist(_distributorBucketSpace.getDistribution());
+ const auto &dist(_bucket_space_state.get_distribution());
if (dist.getNodeGraph().getGroupForNode(index) ==
dist.getNodeGraph().getGroupForNode(_distributorIndex)) {
LOG(debug,
@@ -317,7 +304,7 @@ PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown(
uint16_t nodeIndex,
const lib::ClusterState& state) const
{
- const auto &dist(_distributorBucketSpace.getDistribution());
+ const auto &dist(_bucket_space_state.get_distribution());
if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) {
return false; // Not doing anything for downed groups.
}
diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
index f7766cb265d..37d48323066 100644
--- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
+++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h
@@ -16,9 +16,9 @@ class State;
namespace storage::distributor {
+class BucketSpaceState;
class ClusterInformation;
class PendingClusterState;
-class DistributorBucketSpace;
class StripeAccessGuard;
/**
@@ -50,7 +50,7 @@ private:
const lib::ClusterState& _prevClusterState;
const lib::ClusterState& _newClusterState;
const api::Timestamp _creationTimestamp;
- DistributorBucketSpace& _distributorBucketSpace;
+ const BucketSpaceState& _bucket_space_state;
uint16_t _distributorIndex;
bool _bucketOwnershipTransfer;
std::unordered_map<uint16_t, size_t> _rejectedRequests;
@@ -126,7 +126,7 @@ public:
};
PendingBucketSpaceDbTransition(document::BucketSpace bucket_space,
- DistributorBucketSpace &distributorBucketSpace,
+ const BucketSpaceState &bucket_space_state,
bool distributionChanged,
const OutdatedNodes &outdatedNodes,
std::shared_ptr<const ClusterInformation> clusterInfo,
@@ -135,7 +135,6 @@ public:
~PendingBucketSpaceDbTransition();
// Merges all the results with the corresponding bucket database.
- void mergeIntoBucketDatabase();
void merge_into_bucket_databases(StripeAccessGuard& guard);
// Adds the info from the reply to our list of information.
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
index 69cf5486a8a..59f5d0a9322 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp
@@ -1,13 +1,12 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "pendingclusterstate.h"
+#include "bucket_space_state_map.h"
#include "pending_bucket_space_db_transition.h"
+#include "pendingclusterstate.h"
#include "top_level_bucket_db_updater.h"
-#include "distributor_bucket_space_repo.h"
-#include "distributor_bucket_space.h"
-#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
-#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
+#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/util/xmlstream.hpp>
#include <climits>
@@ -27,7 +26,7 @@ PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
@@ -41,7 +40,7 @@ PendingClusterState::PendingClusterState(
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
- _bucketSpaceRepo(bucketSpaceRepo),
+ _bucket_space_states(bucket_space_states),
_clusterStateVersion(_cmd->getClusterStateBundle().getVersion()),
_isVersionedTransition(true),
_bucketOwnershipTransfer(false),
@@ -55,7 +54,7 @@ PendingClusterState::PendingClusterState(
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
api::Timestamp creationTimestamp)
: _requestedNodes(clusterInfo->getStorageNodeCount()),
_prevClusterStateBundle(clusterInfo->getClusterStateBundle()),
@@ -64,7 +63,7 @@ PendingClusterState::PendingClusterState(
_clusterInfo(clusterInfo),
_creationTimestamp(creationTimestamp),
_sender(sender),
- _bucketSpaceRepo(bucketSpaceRepo),
+ _bucket_space_states(bucket_space_states),
_clusterStateVersion(0),
_isVersionedTransition(false),
_bucketOwnershipTransfer(true),
@@ -80,7 +79,7 @@ void
PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap)
{
OutdatedNodes emptyOutdatedNodes;
- for (auto &elem : _bucketSpaceRepo) {
+ for (const auto &elem : _bucket_space_states) {
auto onItr = outdatedNodesMap.find(elem.first);
const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second;
auto pendingTransition =
@@ -100,8 +99,7 @@ PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged,
void
PendingClusterState::logConstructionInformation() const
{
- const auto &distributorBucketSpace(_bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()));
- const auto &distribution(distributorBucketSpace.getDistribution());
+ const auto &distribution = _bucket_space_states.get(document::FixedBucketSpaces::default_space()).get_distribution();
LOG(debug,
"New PendingClusterState constructed with previous cluster "
"state '%s', new cluster state '%s', distribution config "
@@ -190,8 +188,7 @@ PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState()
void
PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
{
- const auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpaceAndNode.bucketSpace));
- const auto &distribution(distributorBucketSpace.getDistribution());
+ const auto &distribution = _bucket_space_states.get(bucketSpaceAndNode.bucketSpace).get_distribution();
vespalib::string distributionHash;
// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
bool sendLegacyHash = false;
@@ -207,10 +204,10 @@ PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode)
if (!sendLegacyHash) {
distributionHash = distribution.getNodeGraph().getDistributionConfigHash();
} else {
- const auto& defaultSpace = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space());
+ const auto& defaultSpace = _bucket_space_states.get(document::FixedBucketSpaces::default_space());
// Generate legacy distribution hash explicitly.
auto legacyGlobalDistr = GlobalBucketSpaceDistributionConverter::convert_to_global(
- defaultSpace.getDistribution(), true/*use legacy mode*/);
+ defaultSpace.get_distribution(), true/*use legacy mode*/);
distributionHash = legacyGlobalDistr->getNodeGraph().getDistributionConfigHash();
LOG(debug, "Falling back to sending legacy hash to node %u: %s",
bucketSpaceAndNode.node, distributionHash.c_str());
@@ -323,14 +320,6 @@ PendingClusterState::requestNodesToString() const
}
void
-PendingClusterState::mergeIntoBucketDatabases()
-{
- for (auto &elem : _pendingTransitions) {
- elem.second->mergeIntoBucketDatabase();
- }
-}
-
-void
PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard)
{
for (auto &elem : _pendingTransitions) {
diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h
index babcebea69d..fd209197ec6 100644
--- a/storage/src/vespa/storage/distributor/pendingclusterstate.h
+++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h
@@ -15,9 +15,9 @@
namespace storage::distributor {
+class BucketSpaceStateMap;
class DistributorMessageSender;
class PendingBucketSpaceDbTransition;
-class DistributorBucketSpaceRepo;
class StripeAccessGuard;
/**
@@ -45,14 +45,14 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo,
+ clock, clusterInfo, sender, bucket_space_states,
newStateCmd, outdatedNodesMap, creationTimestamp));
}
@@ -64,12 +64,12 @@ public:
const framework::Clock& clock,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
api::Timestamp creationTimestamp)
{
// Naked new due to private constructor
return std::unique_ptr<PendingClusterState>(new PendingClusterState(
- clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp));
+ clock, clusterInfo, sender, bucket_space_states, creationTimestamp));
}
PendingClusterState(const PendingClusterState &) = delete;
@@ -146,7 +146,6 @@ public:
/**
* Merges all the results with the corresponding bucket databases.
*/
- void mergeIntoBucketDatabases();
void merge_into_bucket_databases(StripeAccessGuard& guard);
// Get pending transition for a specific bucket space. Only used by unit test.
@@ -169,7 +168,7 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd,
const OutdatedNodesMap &outdatedNodesMap,
api::Timestamp creationTimestamp);
@@ -182,7 +181,7 @@ private:
const framework::Clock&,
const ClusterInformation::CSP& clusterInfo,
DistributorMessageSender& sender,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const BucketSpaceStateMap& bucket_space_states,
api::Timestamp creationTimestamp);
struct BucketSpaceAndNode {
@@ -229,7 +228,7 @@ private:
api::Timestamp _creationTimestamp;
DistributorMessageSender& _sender;
- DistributorBucketSpaceRepo& _bucketSpaceRepo;
+ const BucketSpaceStateMap& _bucket_space_states;
uint32_t _clusterStateVersion;
bool _isVersionedTransition;
bool _bucketOwnershipTransfer;
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 20ecf68c3f1..ac97dde6a0c 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -58,10 +58,8 @@ TopLevelBucketDBUpdater::~TopLevelBucketDBUpdater() = default;
void
TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() {
- for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) {
- for (auto& iter : *repo) {
- iter.second->setClusterState(_active_state_bundle.getDerivedClusterState(iter.first));
- }
+ for (auto& elem : _op_ctx.bucket_space_states()) {
+ elem.second->set_cluster_state(_active_state_bundle.getDerivedClusterState(elem.first));
}
if (_state_activation_listener) {
_state_activation_listener->on_cluster_state_bundle_activated(_active_state_bundle);
@@ -71,23 +69,19 @@ TopLevelBucketDBUpdater::propagate_active_state_bundle_internally() {
void
TopLevelBucketDBUpdater::bootstrap_distribution_config(std::shared_ptr<const lib::Distribution> distribution) {
auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
- for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) {
- repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
- repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
- }
+ _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distribution);
+ _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(global_distr);
// TODO STRIPE do we need to bootstrap the stripes as well here? Or do they do this on their own volition?
// ... need to take a guard if so, so can probably not be done at ctor time..?
}
void
TopLevelBucketDBUpdater::propagate_distribution_config(const BucketSpaceDistributionConfigs& configs) {
- for (auto* repo : {&_op_ctx.bucket_space_repo(), &_op_ctx.read_only_bucket_space_repo()}) {
- if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) {
- repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distr);
- }
- if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::global_space())) {
- repo->get(document::FixedBucketSpaces::global_space()).setDistribution(distr);
- }
+ if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::default_space())) {
+ _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::default_space()).set_distribution(distr);
+ }
+ if (auto distr = configs.get_or_nullptr(document::FixedBucketSpaces::global_space())) {
+ _op_ctx.bucket_space_states().get(document::FixedBucketSpaces::global_space()).set_distribution(distr);
}
}
@@ -122,10 +116,8 @@ TopLevelBucketDBUpdater::remove_superfluous_buckets(
bool is_distribution_config_change)
{
const char* up_states = storage_node_up_states();
- // TODO STRIPE explicit space -> config mapping, don't get via repo
- // ... but we need to get the current cluster state per space..!
- for (auto& elem : _op_ctx.bucket_space_repo()) {
- const auto& old_cluster_state(elem.second->getClusterState());
+ for (auto& elem : _op_ctx.bucket_space_states()) {
+ const auto& old_cluster_state(elem.second->get_cluster_state());
const auto& new_cluster_state = new_state.getDerivedClusterState(elem.first);
// Running a full DB sweep is expensive, so if the cluster state transition does
@@ -206,7 +198,7 @@ TopLevelBucketDBUpdater::storage_distribution_changed(const BucketSpaceDistribut
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(), // TODO STRIPE cannot use!
+ _op_ctx.bucket_space_states(),
_op_ctx.generate_unique_timestamp());
_outdated_nodes_map = _pending_cluster_state->getOutdatedNodesMap();
@@ -263,7 +255,7 @@ TopLevelBucketDBUpdater::onSetSystemState(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _op_ctx.bucket_space_repo(), // TODO STRIPE remove
+ _op_ctx.bucket_space_states(),
cmd,
_outdated_nodes_map,
_op_ctx.generate_unique_timestamp()); // FIXME STRIPE must be atomic across all threads
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index 2eccb70fdf9..e01ea30cbda 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -2,7 +2,7 @@
#pragma once
#include "bucketlistmerger.h"
-#include "distributor_stripe_component.h"
+#include "distributor_component.h"
#include "distributormessagesender.h"
#include "messageguard.h"
#include "operation_routing_snapshot.h"
diff --git a/storage/src/vespa/storage/distributor/top_level_distributor.cpp b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
index 456464576a1..5f8f05c3ee0 100644
--- a/storage/src/vespa/storage/distributor/top_level_distributor.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_distributor.cpp
@@ -573,7 +573,7 @@ TopLevelDistributor::reportStatus(std::ostream& out,
} else {
auto guard = _stripe_accessor->rendezvous_and_hold_all();
const auto& op_ctx = _component;
- for (const auto& space : op_ctx.bucket_space_repo()) {
+ for (const auto& space : op_ctx.bucket_space_states()) {
out << "<h2>" << document::FixedBucketSpaces::to_string(space.first) << " - " << space.first << "</h2>\n";
guard->report_bucket_db_status(space.first, out);
}