summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-03-17 13:51:29 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-03-17 15:26:11 +0000
commit1e5c4c7e5e0d5e3ccd5528abd48de9fda6c35978 (patch)
tree5f89cbc3182f0e8318096e409517d65efa8f0994 /storage
parentee112380088e1c50939b47458469de606f5d1b88 (diff)
Reduce usage of DistributorComponent by using DistributorOperationContext instead.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp68
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h3
4 files changed, 39 insertions, 34 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index e6f44c92dbe..710471bc167 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -32,6 +32,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_node_ctx(_distributorComponent),
+ _op_ctx(_distributorComponent),
_delayedRequests(),
_sentMessages(),
_pendingClusterState(),
@@ -45,7 +46,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
_explicit_transition_read_guard(),
_distribution_context_mutex()
{
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
_active_distribution_contexts.emplace(
elem.first,
BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
@@ -69,8 +70,8 @@ OperationRoutingSnapshot BucketDBUpdater::read_snapshot_for_bucket(const documen
return OperationRoutingSnapshot::make_not_routable_in_state(active_state_iter->second);
}
const auto& space_repo = bucket_present_in_mutable_db
- ? _distributorComponent.getBucketSpaceRepo()
- : _distributorComponent.getReadOnlyBucketSpaceRepo();
+ ? _op_ctx.bucket_space_repo()
+ : _op_ctx.read_only_bucket_space_repo();
auto existing_guard_iter = _explicit_transition_read_guard.find(bucket_space);
assert(existing_guard_iter != _explicit_transition_read_guard.cend());
auto db_guard = existing_guard_iter->second
@@ -124,7 +125,7 @@ BucketDBUpdater::sendRequestBucketInfo(
const document::Bucket& bucket,
const std::shared_ptr<MergeReplyGuard>& mergeReplyGuard)
{
- if (!_distributorComponent.storageNodeIsUp(bucket.getBucketSpace(), node)) {
+ if (!_op_ctx.storage_node_is_up(bucket.getBucketSpace(), node)) {
return;
}
@@ -144,7 +145,7 @@ BucketDBUpdater::sendRequestBucketInfo(
msg->setAddress(_node_ctx.node_address(node));
_sentMessages[msg->getMsgId()] =
- BucketRequest(node, _distributorComponent.getUniqueTimestamp(),
+ BucketRequest(node, _op_ctx.generate_unique_timestamp(),
bucket, mergeReplyGuard);
_sender.sendCommand(msg);
}
@@ -209,8 +210,8 @@ BucketDBUpdater::removeSuperfluousBuckets(
bool is_distribution_config_change)
{
const bool move_to_read_only_db = shouldDeferStateEnabling();
- const char* up_states = _distributorComponent.getDistributor().getStorageNodeUpStates();
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ const char* up_states = _op_ctx.storage_node_up_states();
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
const auto& newDistribution(elem.second->getDistribution());
const auto& oldClusterState(elem.second->getClusterState());
const auto& new_cluster_state = newState.getDerivedClusterState(elem.first);
@@ -227,7 +228,7 @@ BucketDBUpdater::removeSuperfluousBuckets(
}
auto& bucketDb(elem.second->getBucketDatabase());
- auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase());
+ auto& readOnlyDb(_op_ctx.read_only_bucket_space_repo().get(elem.first).getBucketDatabase());
// Remove all buckets not belonging to this distributor, or
// being on storage nodes that are no longer up.
@@ -260,12 +261,12 @@ void maybe_sleep_for(std::chrono::milliseconds ms) {
void
BucketDBUpdater::maybe_inject_simulated_db_pruning_delay() {
- maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_pruning_latency());
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_pruning_latency());
}
void
BucketDBUpdater::maybe_inject_simulated_db_merging_delay() {
- maybe_sleep_for(_distributorComponent.getDistributor().getConfig().simulated_db_merging_latency());
+ maybe_sleep_for(_op_ctx.distributor_config().simulated_db_merging_latency());
}
void
@@ -289,7 +290,7 @@ BucketDBUpdater::completeTransitionTimer()
void
BucketDBUpdater::clearReadOnlyBucketRepoDatabases()
{
- for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) {
+ for (auto& space : _op_ctx.read_only_bucket_space_repo()) {
space.second->getBucketDatabase().clear();
}
}
@@ -299,20 +300,20 @@ BucketDBUpdater::storageDistributionChanged()
{
ensureTransitionTimerStarted();
- removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true);
+ removeSuperfluousBuckets(_op_ctx.cluster_state_bundle(), true);
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
- _distributorComponent.getClusterStateBundle(),
- _distributorComponent.getDistributor().getStorageNodeUpStates());
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
_pendingClusterState = PendingClusterState::createForDistributionChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _distributorComponent.getBucketSpaceRepo(),
- _distributorComponent.getUniqueTimestamp());
+ _op_ctx.bucket_space_repo(),
+ _op_ctx.generate_unique_timestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
}
void
@@ -336,7 +337,7 @@ BucketDBUpdater::replyToActivationWithActualVersion(
void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
std::lock_guard lock(_distribution_context_mutex);
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
// At this point, we're still operating with a distribution context _without_ a
// pending state, i.e. anyone using the context will expect to find buckets
// in the DB that correspond to how the database looked like prior to pruning
@@ -353,9 +354,9 @@ void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
std::lock_guard lock(_distribution_context_mutex);
- const auto old_default_state = _distributorComponent.getBucketSpaceRepo().get(
+ const auto old_default_state = _op_ctx.bucket_space_repo().get(
document::FixedBucketSpaces::default_space()).cluster_state_sp();
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
auto new_distribution = elem.second->distribution_sp();
auto old_cluster_state = elem.second->cluster_state_sp();
auto new_cluster_state = new_state.getDerivedClusterState(elem.first);
@@ -376,7 +377,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
std::lock_guard lock(_distribution_context_mutex);
const auto& default_cluster_state = activated_state.getDerivedClusterState(document::FixedBucketSpaces::default_space());
- for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
+ for (auto& elem : _op_ctx.bucket_space_repo()) {
auto new_distribution = elem.second->distribution_sp();
auto new_cluster_state = activated_state.getDerivedClusterState(elem.first);
_active_distribution_contexts.insert_or_assign(
@@ -397,7 +398,7 @@ BucketDBUpdater::onSetSystemState(
"Received new cluster state %s",
cmd->getSystemState().toString().c_str());
- const lib::ClusterStateBundle oldState = _distributorComponent.getClusterStateBundle();
+ const lib::ClusterStateBundle oldState = _op_ctx.cluster_state_bundle();
const lib::ClusterStateBundle& state = cmd->getClusterStateBundle();
if (state == oldState) {
@@ -414,23 +415,22 @@ BucketDBUpdater::onSetSystemState(
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
_node_ctx.node_index(),
- _distributorComponent.getClusterStateBundle(),
- _distributorComponent.getDistributor()
- .getStorageNodeUpStates());
+ _op_ctx.cluster_state_bundle(),
+ _op_ctx.storage_node_up_states());
_pendingClusterState = PendingClusterState::createForClusterStateChange(
_node_ctx.clock(),
std::move(clusterInfo),
_sender,
- _distributorComponent.getBucketSpaceRepo(),
+ _op_ctx.bucket_space_repo(),
cmd,
_outdatedNodesMap,
- _distributorComponent.getUniqueTimestamp());
+ _op_ctx.generate_unique_timestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
_distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
- _distributorComponent.getBucketSpaceRepo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
+ _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
if (isPendingClusterStateCompleted()) {
processCompletedPendingClusterState();
}
@@ -667,7 +667,7 @@ BucketDBUpdater::processSingleBucketInfoReply(
BucketRequest req = iter->second;
_sentMessages.erase(iter);
- if (!_distributorComponent.storageNodeIsUp(req.bucket.getBucketSpace(), req.targetNode)) {
+ if (!_op_ctx.storage_node_is_up(req.bucket.getBucketSpace(), req.targetNode)) {
// Ignore replies from nodes that are down.
return true;
}
@@ -695,7 +695,7 @@ void
BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::Bucket& bucket,
BucketListMerger::BucketList& existing)
{
- auto &distributorBucketSpace(_distributorComponent.getBucketSpaceRepo().get(bucket.getBucketSpace()));
+ auto &distributorBucketSpace(_op_ctx.bucket_space_repo().get(bucket.getBucketSpace()));
std::vector<BucketDatabase::Entry> entries;
distributorBucketSpace.getBucketDatabase().getAll(bucket.getBucketId(), entries);
@@ -709,12 +709,12 @@ BucketDBUpdater::updateDatabase(document::BucketSpace bucketSpace, uint16_t node
{
for (const document::BucketId & bucketId : merger.getRemovedEntries()) {
document::Bucket bucket(bucketSpace, bucketId);
- _distributorComponent.removeNodeFromDB(bucket, node);
+ _op_ctx.remove_node_from_bucket_database(bucket, node);
}
for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
document::Bucket bucket(bucketSpace, entry.first);
- _distributorComponent.updateBucketDatabase(
+ _op_ctx.update_bucket_database(
bucket,
BucketCopy(merger.getTimestamp(), node, entry.second),
DatabaseUpdate::CREATE_IF_NONEXISTING);
@@ -778,7 +778,7 @@ BucketDBUpdater::activatePendingClusterState()
update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
_pendingClusterState.reset();
_outdatedNodesMap.clear();
- _distributorComponent.getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ _op_ctx.bucket_space_repo().clear_pending_cluster_state_bundle();
sendAllQueuedBucketRechecks();
completeTransitionTimer();
clearReadOnlyBucketRepoDatabases();
@@ -868,7 +868,7 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
using namespace vespalib::xml;
xos << XmlTag("bucketdb")
<< XmlTag("systemstate_active")
- << XmlContent(_distributorComponent.getClusterStateBundle().getBaselineClusterState()->toString())
+ << XmlContent(_op_ctx.cluster_state_bundle().getBaselineClusterState()->toString())
<< XmlEndTag();
if (_pendingClusterState) {
xos << *_pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 5503e6bf22e..d706d162050 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -240,6 +240,7 @@ private:
DistributorComponent _distributorComponent;
const DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
std::map<uint64_t, BucketRequest> _sentMessages;
std::unique_ptr<PendingClusterState> _pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
index 97a522a694a..0b47c71e2e1 100644
--- a/storage/src/vespa/storage/distributor/distributor_operation_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -50,6 +50,7 @@ public:
uint32_t message_type) const = 0;
virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0;
+ virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0;
// TODO: Move to being a free function instead.
virtual const char* storage_node_up_states() const = 0;
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index c2ab258be35..ca953ed01ef 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -204,6 +204,9 @@ public:
const lib::ClusterStateBundle& cluster_state_bundle() const override {
return getClusterStateBundle();
}
+ bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const override {
+ return storageNodeIsUp(bucket_space, node_index);
+ }
const char* storage_node_up_states() const override {
return getDistributor().getStorageNodeUpStates();
}