diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-03-17 10:20:45 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2021-03-17 15:26:11 +0000 |
commit | ee112380088e1c50939b47458469de606f5d1b88 (patch) | |
tree | 9e5eba9230ca33d3874614e881c1681d954bd177 /storage | |
parent | 06e15060de09afd11f4af92fcd8b669b58a32b2c (diff) |
Reduce usage of DistributorComponent by using DistributorNodeContext instead.
Diffstat (limited to 'storage')
4 files changed, 20 insertions, 15 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 69f7d67c4d0..e6f44c92dbe 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -31,6 +31,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, DistributorComponentRegister& compReg) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), + _node_ctx(_distributorComponent), _delayedRequests(), _sentMessages(), _pendingClusterState(), @@ -38,7 +39,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, _sender(sender), _enqueuedRechecks(), _outdatedNodesMap(), - _transitionTimer(_distributorComponent.getClock()), + _transitionTimer(_node_ctx.clock()), _stale_reads_enabled(false), _active_distribution_contexts(), _explicit_transition_read_guard(), @@ -47,7 +48,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, for (auto& elem : _distributorComponent.getBucketSpaceRepo()) { _active_distribution_contexts.emplace( elem.first, - BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex())); + BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index())); _explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>()); } } @@ -140,7 +141,7 @@ BucketDBUpdater::sendRequestBucketInfo( node); msg->setPriority(50); - msg->setAddress(_distributorComponent.nodeAddress(node)); + msg->setAddress(_node_ctx.node_address(node)); _sentMessages[msg->getMsgId()] = BucketRequest(node, _distributorComponent.getUniqueTimestamp(), @@ -233,7 +234,7 @@ BucketDBUpdater::removeSuperfluousBuckets( MergingNodeRemover proc( oldClusterState, *new_cluster_state, - _distributorComponent.getIndex(), + _node_ctx.node_index(), newDistribution, up_states, move_to_read_only_db); @@ -274,7 +275,7 @@ BucketDBUpdater::ensureTransitionTimerStarted() // that will make transition times appear artificially low. if (!hasPendingClusterState()) { _transitionTimer = framework::MilliSecTimer( - _distributorComponent.getClock()); + _node_ctx.clock()); } } @@ -301,11 +302,11 @@ BucketDBUpdater::storageDistributionChanged() removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true); auto clusterInfo = std::make_shared<const SimpleClusterInformation>( - _distributorComponent.getIndex(), + _node_ctx.node_index(), _distributorComponent.getClusterStateBundle(), _distributorComponent.getDistributor().getStorageNodeUpStates()); _pendingClusterState = PendingClusterState::createForDistributionChange( - _distributorComponent.getClock(), + _node_ctx.clock(), std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), @@ -365,7 +366,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt old_default_state, std::move(new_cluster_state), std::move(new_distribution), - _distributorComponent.getIndex())); + _node_ctx.node_index())); // We can now remove the explicit mutable DB snapshot, as the buckets that have been // pruned away are visible in the read-only DB. _explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>(); @@ -384,7 +385,7 @@ void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterSt std::move(new_cluster_state), default_cluster_state, std::move(new_distribution), - _distributorComponent.getIndex())); + _node_ctx.node_index())); } } @@ -404,7 +405,7 @@ BucketDBUpdater::onSetSystemState( } ensureTransitionTimerStarted(); // Separate timer since _transitionTimer might span multiple pending states. - framework::MilliSecTimer process_timer(_distributorComponent.getClock()); + framework::MilliSecTimer process_timer(_node_ctx.clock()); update_read_snapshot_before_db_pruning(); const auto& bundle = cmd->getClusterStateBundle(); removeSuperfluousBuckets(bundle, false); @@ -412,12 +413,12 @@ BucketDBUpdater::onSetSystemState( replyToPreviousPendingClusterStateIfAny(); auto clusterInfo = std::make_shared<const SimpleClusterInformation>( - _distributorComponent.getIndex(), + _node_ctx.node_index(), _distributorComponent.getClusterStateBundle(), _distributorComponent.getDistributor() .getStorageNodeUpStates()); _pendingClusterState = PendingClusterState::createForClusterStateChange( - _distributorComponent.getClock(), + _node_ctx.clock(), std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), @@ -596,7 +597,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure( req.targetNode, repl->getResult().toString().c_str()); if (req.bucket.getBucketId() != document::BucketId(0)) { - framework::MilliSecTime sendTime(_distributorComponent.getClock()); + framework::MilliSecTime sendTime(_node_ctx.clock()); sendTime += framework::MilliSecTime(100); _delayedRequests.emplace_back(sendTime, req); } @@ -611,7 +612,7 @@ BucketDBUpdater::resendDelayedMessages() if (_delayedRequests.empty()) { return; // Don't fetch time if not needed } - framework::MilliSecTime currentTime(_distributorComponent.getClock()); + framework::MilliSecTime currentTime(_node_ctx.clock()); while (!_delayedRequests.empty() && currentTime >= _delayedRequests.front().first) { @@ -754,7 +755,7 @@ BucketDBUpdater::processCompletedPendingClusterState() void BucketDBUpdater::activatePendingClusterState() { - framework::MilliSecTimer process_timer(_distributorComponent.getClock()); + framework::MilliSecTimer process_timer(_node_ctx.clock()); _pendingClusterState->mergeIntoBucketDatabases(); maybe_inject_simulated_db_merging_delay(); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 529bfef4104..5503e6bf22e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -239,6 +239,7 @@ private: }; DistributorComponent _distributorComponent; + const DistributorNodeContext& _node_ctx; std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests; std::map<uint64_t, BucketRequest> _sentMessages; std::unique_ptr<PendingClusterState> _pendingClusterState; diff --git a/storage/src/vespa/storage/distributor/distributor_node_context.h b/storage/src/vespa/storage/distributor/distributor_node_context.h index 3cb0f509ea7..805e54342dc 100644 --- a/storage/src/vespa/storage/distributor/distributor_node_context.h +++ b/storage/src/vespa/storage/distributor/distributor_node_context.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/storage/common/cluster_context.h> +#include <vespa/storageapi/messageapi/storagemessage.h> #include <cstdint> namespace document { class BucketIdFactory; } @@ -20,6 +21,7 @@ public: virtual const framework::Clock& clock() const noexcept = 0; virtual const document::BucketIdFactory& bucket_id_factory() const noexcept = 0; virtual uint16_t node_index() const noexcept = 0; + virtual api::StorageMessageAddress node_address(uint16_t node_index) const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index b4a0b00a2e1..c2ab258be35 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -149,6 +149,7 @@ public: const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); } const document::BucketIdFactory& bucket_id_factory() const noexcept override { return getBucketIdFactory(); } uint16_t node_index() const noexcept override { return getIndex(); } + api::StorageMessageAddress node_address(uint16_t node_index) const noexcept override { return nodeAddress(node_index); } // Implements DistributorOperationContext api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); } |