diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-03-17 15:18:08 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-03-17 15:26:11 +0000 |
commit | b5b2b600fa63d9b13f184261e7b5399fac2ca16a (patch) | |
tree | b2361a52cf414cc26c786f276176a991467f3658 /storage | |
parent | 1e5c4c7e5e0d5e3ccd5528abd48de9fda6c35978 (diff) |
Reduce usage of DistributorComponent by using DistributorInterface instead.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.cpp | 21 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/bucketdbupdater.h | 1 |
2 files changed, 12 insertions, 10 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 710471bc167..b214827d7fb 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -33,6 +33,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _node_ctx(_distributorComponent), _op_ctx(_distributorComponent), + _distributor_interface(_distributorComponent.getDistributor()), _delayedRequests(), _sentMessages(), _pendingClusterState(), @@ -283,7 +284,7 @@ BucketDBUpdater::ensureTransitionTimerStarted() void BucketDBUpdater::completeTransitionTimer() { - _distributorComponent.getDistributor().getMetrics() + _distributor_interface.getMetrics() .stateTransitionTime.addValue(_transitionTimer.getElapsedTimeAsDouble()); } @@ -320,7 +321,7 @@ void BucketDBUpdater::replyToPreviousPendingClusterStateIfAny() { if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { - _distributorComponent.sendUp( + _distributor_interface.getMessageSender().sendUp( std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); } } @@ -332,7 +333,7 @@ BucketDBUpdater::replyToActivationWithActualVersion( { auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd); reply->setActualVersion(actualVersion); - _distributorComponent.sendUp(reply); // TODO let API accept rvalues + _distributor_interface.getMessageSender().sendUp(reply); // TODO let API accept rvalues } void BucketDBUpdater::update_read_snapshot_before_db_pruning() { @@ -427,7 +428,7 @@ BucketDBUpdater::onSetSystemState( _op_ctx.generate_unique_timestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); - _distributorComponent.getDistributor().getMetrics().set_cluster_state_processing_time.addValue( + _distributor_interface.getMetrics().set_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); _op_ctx.bucket_space_repo().set_pending_cluster_state_bundle(_pendingClusterState->getNewClusterStateBundle()); @@ -742,7 +743,7 @@ BucketDBUpdater::processCompletedPendingClusterState() // taken effect via activation. External operation handler will keep operations from // actually being scheduled until state has been activated. The external operation handler // needs to be explicitly aware of the case where no state has yet to be activated. - _distributorComponent.getDistributor().getMessageSender().sendDown( + _distributor_interface.getMessageSender().sendDown( _pendingClusterState->getCommand()); _pendingClusterState->clearCommand(); return; @@ -764,7 +765,7 @@ BucketDBUpdater::activatePendingClusterState() LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); enableCurrentClusterStateBundleInDistributor(); if (_pendingClusterState->hasCommand()) { - _distributorComponent.getDistributor().getMessageSender().sendDown( + _distributor_interface.getMessageSender().sendDown( _pendingClusterState->getCommand()); } addCurrentStateToClusterStateHistory(); @@ -772,7 +773,7 @@ BucketDBUpdater::activatePendingClusterState() LOG(debug, "Activating pending distribution config"); // TODO distribution changes cannot currently be deferred as they are not // initiated by the cluster controller! - _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); + _distributor_interface.notifyDistributionChangeEnabled(); } update_read_snapshot_after_activation(_pendingClusterState->getNewClusterStateBundle()); @@ -783,7 +784,7 @@ BucketDBUpdater::activatePendingClusterState() completeTransitionTimer(); clearReadOnlyBucketRepoDatabases(); - _distributorComponent.getDistributor().getMetrics().activate_cluster_state_processing_time.addValue( + _distributor_interface.getMetrics().activate_cluster_state_processing_time.addValue( process_timer.getElapsedTimeAsDouble()); } @@ -797,12 +798,12 @@ BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() "BucketDBUpdater finished processing state %s", state.getBaselineClusterState()->toString().c_str()); - _distributorComponent.getDistributor().enableClusterStateBundle(state); + _distributor_interface.enableClusterStateBundle(state); } void BucketDBUpdater::simulate_cluster_state_bundle_activation(const lib::ClusterStateBundle& activated_state) { update_read_snapshot_after_activation(activated_state); - _distributorComponent.getDistributor().enableClusterStateBundle(activated_state); + _distributor_interface.enableClusterStateBundle(activated_state); } void diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index d706d162050..d6538d9c60d 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -241,6 +241,7 @@ private: DistributorComponent _distributorComponent; const DistributorNodeContext& _node_ctx; DistributorOperationContext& _op_ctx; + DistributorInterface& _distributor_interface; std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; std::map<uint64_t, BucketRequest> _sentMessages; std::unique_ptr<PendingClusterState> _pendingClusterState; |