summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-03-17 15:18:08 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-03-17 15:26:11 +0000
commitb5b2b600fa63d9b13f184261e7b5399fac2ca16a (patch)
treeb2361a52cf414cc26c786f276176a991467f3658 /storage
parent1e5c4c7e5e0d5e3ccd5528abd48de9fda6c35978 (diff)
Reduce usage of DistributorComponent by using DistributorInterface instead.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
2 files changed, 12 insertions, 10 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 710471bc167..b214827d7fb 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -33,6 +33,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
_node_ctx(_distributorComponent),
_op_ctx(_distributorComponent),
+ _distributor_interface(_distributorComponent.getDistributor()),
_delayedRequests(),
_sentMessages(),
_pendingClusterState(),
@@ -283,7 +284,7 @@ BucketDBUpdater::ensureTransitionTimerStarted()
void
BucketDBUpdater::completeTransitionTimer()
{
- _distributorComponent.getDistributor().getMetrics()
+ _distributor_interface.getMetrics()
.stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble());
}
@@ -320,7 +321,7 @@ void
BucketDBUpdater::replyToPreviousPendingClusterStateIfAny()
{
if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) {
- _distributorComponent.sendUp(
+ _distributor_interface.getMessageSender().sendUp(
std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand()));
}
}
@@ -332,7 +333,7 @@ BucketDBUpdater::replyToActivationWithActualVersion(
{
auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd);
reply->setActualVersion(actualVersion);
- _distributorComponent.sendUp(reply); // TODO let API accept rvalues
+ _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues
}
void BucketDBUpdater::update_read_snapshot_before_db_pruning() {
@@ -427,7 +428,7 @@ BucketDBUpdater::onSetSystemState(
_op_ctx.generate_unique_timestamp());
_outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap();
- _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue(
+ _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
_op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle());
@@ -742,7 +743,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
// taken effect via activation. External operation handler will keep operations from
// actually being scheduled until state has been activated. The external operation handler
// needs to be explicitly aware of the case where no state has yet to be activated.
- _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _distributor_interface.getMessageSender().sendDown(
_pendingClusterState->getCommand());
_pendingClusterState->clearCommand();
return;
@@ -764,7 +765,7 @@ BucketDBUpdater::activatePendingClusterState()
LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion());
enableCurrentClusterStateBundleInDistributor();
if (_pendingClusterState->hasCommand()) {
- _distributorComponent.getDistributor().getMessageSender().sendDown(
+ _distributor_interface.getMessageSender().sendDown(
_pendingClusterState->getCommand());
}
addCurrentStateToClusterStateHistory();
@@ -772,7 +773,7 @@ BucketDBUpdater::activatePendingClusterState()
LOG(debug, "Activating pending distribution config");
// TODO distribution changes cannot currently be deferred as they are not
// initiated by the cluster controller!
- _distributorComponent.getDistributor().notifyDistributionChangeEnabled();
+ _distributor_interface.notifyDistributionChangeEnabled();
}
update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle());
@@ -783,7 +784,7 @@ BucketDBUpdater::activatePendingClusterState()
completeTransitionTimer();
clearReadOnlyBucketRepoDatabases();
- _distributorComponent.getDistributor().getMetrics().activate_cluster_state_processing_time.addValue(
+ _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue(
process_timer.getElapsedTimeAsDouble());
}
@@ -797,12 +798,12 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor()
"BucketDBUpdater finished processing state %s",
state.getBaselineClusterState()->toString().c_str());
- _distributorComponent.getDistributor().enableClusterStateBundle(state);
+ _distributor_interface.enableClusterStateBundle(state);
}
void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) {
update_read_snapshot_after_activation(activated_state);
- _distributorComponent.getDistributor().enableClusterStateBundle(activated_state);
+ _distributor_interface.enableClusterStateBundle(activated_state);
}
void
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index d706d162050..d6538d9c60d 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -241,6 +241,7 @@ private:
DistributorComponent _distributorComponent;
const DistributorNodeContext& _node_ctx;
DistributorOperationContext& _op_ctx;
+ DistributorInterface& _distributor_interface;
std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
std::map<uint64_t, BucketRequest> _sentMessages;
std::unique_ptr<PendingClusterState> _pendingClusterState;