summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-03-17 10:20:45 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-03-17 15:26:11 +0000
commitee112380088e1c50939b47458469de606f5d1b88 (patch)
tree9e5eba9230ca33d3874614e881c1681d954bd177 /storage
parent06e15060de09afd11f4af92fcd8b669b58a32b2c (diff)
Reduce usage of DistributorComponent by using DistributorNodeContext instead.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h1
-rw-r--r--storage/src/vespa/storage/distributor/distributor_node_context.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h1
4 files changed, 20 insertions, 15 deletions
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 69f7d67c4d0..e6f44c92dbe 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -31,6 +31,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
DistributorComponentRegister& compReg)
: framework::StatusReporter("bucketdb", "Bucket DB Updater"),
_distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"),
+ _node_ctx(_distributorComponent),
_delayedRequests(),
_sentMessages(),
_pendingClusterState(),
@@ -38,7 +39,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
_sender(sender),
_enqueuedRechecks(),
_outdatedNodesMap(),
- _transitionTimer(_distributorComponent.getClock()),
+ _transitionTimer(_node_ctx.clock()),
_stale_reads_enabled(false),
_active_distribution_contexts(),
_explicit_transition_read_guard(),
@@ -47,7 +48,7 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner,
for (auto& elem : _distributorComponent.getBucketSpaceRepo()) {
_active_distribution_contexts.emplace(
elem.first,
- BucketSpaceDistributionContext::make_not_yet_initialized(_distributorComponent.getIndex()));
+ BucketSpaceDistributionContext::make_not_yet_initialized(_node_ctx.node_index()));
_explicit_transition_read_guard.emplace(elem.first, std::shared_ptr<BucketDatabase::ReadGuard>());
}
}
@@ -140,7 +141,7 @@ BucketDBUpdater::sendRequestBucketInfo(
node);
msg->setPriority(50);
- msg->setAddress(_distributorComponent.nodeAddress(node));
+ msg->setAddress(_node_ctx.node_address(node));
_sentMessages[msg->getMsgId()] =
BucketRequest(node, _distributorComponent.getUniqueTimestamp(),
@@ -233,7 +234,7 @@ BucketDBUpdater::removeSuperfluousBuckets(
MergingNodeRemover proc(
oldClusterState,
*new_cluster_state,
- _distributorComponent.getIndex(),
+ _node_ctx.node_index(),
newDistribution,
up_states,
move_to_read_only_db);
@@ -274,7 +275,7 @@ BucketDBUpdater::ensureTransitionTimerStarted()
// that will make transition times appear artificially low.
if (!hasPendingClusterState()) {
_transitionTimer = framework::MilliSecTimer(
- _distributorComponent.getClock());
+ _node_ctx.clock());
}
}
@@ -301,11 +302,11 @@ BucketDBUpdater::storageDistributionChanged()
removeSuperfluousBuckets(_distributorComponent.getClusterStateBundle(), true);
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
- _distributorComponent.getIndex(),
+ _node_ctx.node_index(),
_distributorComponent.getClusterStateBundle(),
_distributorComponent.getDistributor().getStorageNodeUpStates());
_pendingClusterState = PendingClusterState::createForDistributionChange(
- _distributorComponent.getClock(),
+ _node_ctx.clock(),
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
@@ -365,7 +366,7 @@ void BucketDBUpdater::update_read_snapshot_after_db_pruning(const lib::ClusterSt
old_default_state,
std::move(new_cluster_state),
std::move(new_distribution),
- _distributorComponent.getIndex()));
+ _node_ctx.node_index()));
// We can now remove the explicit mutable DB snapshot, as the buckets that have been
// pruned away are visible in the read-only DB.
_explicit_transition_read_guard[elem.first] = std::shared_ptr<BucketDatabase::ReadGuard>();
@@ -384,7 +385,7 @@ void BucketDBUpdater::update_read_snapshot_after_activation(const lib::ClusterSt
std::move(new_cluster_state),
default_cluster_state,
std::move(new_distribution),
- _distributorComponent.getIndex()));
+ _node_ctx.node_index()));
}
}
@@ -404,7 +405,7 @@ BucketDBUpdater::onSetSystemState(
}
ensureTransitionTimerStarted();
// Separate timer since _transitionTimer might span multiple pending states.
- framework::MilliSecTimer process_timer(_distributorComponent.getClock());
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
update_read_snapshot_before_db_pruning();
const auto& bundle = cmd->getClusterStateBundle();
removeSuperfluousBuckets(bundle, false);
@@ -412,12 +413,12 @@ BucketDBUpdater::onSetSystemState(
replyToPreviousPendingClusterStateIfAny();
auto clusterInfo = std::make_shared<const SimpleClusterInformation>(
- _distributorComponent.getIndex(),
+ _node_ctx.node_index(),
_distributorComponent.getClusterStateBundle(),
_distributorComponent.getDistributor()
.getStorageNodeUpStates());
_pendingClusterState = PendingClusterState::createForClusterStateChange(
- _distributorComponent.getClock(),
+ _node_ctx.clock(),
std::move(clusterInfo),
_sender,
_distributorComponent.getBucketSpaceRepo(),
@@ -596,7 +597,7 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
req.targetNode, repl->getResult().toString().c_str());
if (req.bucket.getBucketId() != document::BucketId(0)) {
- framework::MilliSecTime sendTime(_distributorComponent.getClock());
+ framework::MilliSecTime sendTime(_node_ctx.clock());
sendTime += framework::MilliSecTime(100);
_delayedRequests.emplace_back(sendTime, req);
}
@@ -611,7 +612,7 @@ BucketDBUpdater::resendDelayedMessages()
if (_delayedRequests.empty()) {
return; // Don't fetch time if not needed
}
- framework::MilliSecTime currentTime(_distributorComponent.getClock());
+ framework::MilliSecTime currentTime(_node_ctx.clock());
while (!_delayedRequests.empty()
&& currentTime >= _delayedRequests.front().first)
{
@@ -754,7 +755,7 @@ BucketDBUpdater::processCompletedPendingClusterState()
void
BucketDBUpdater::activatePendingClusterState()
{
- framework::MilliSecTimer process_timer(_distributorComponent.getClock());
+ framework::MilliSecTimer process_timer(_node_ctx.clock());
_pendingClusterState->mergeIntoBucketDatabases();
maybe_inject_simulated_db_merging_delay();
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 529bfef4104..5503e6bf22e 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -239,6 +239,7 @@ private:
};
DistributorComponent _distributorComponent;
+ const DistributorNodeContext& _node_ctx;
std::deque<std::pair<framework::MilliSecTime, BucketRequest> > _delayedRequests;
std::map<uint64_t, BucketRequest> _sentMessages;
std::unique_ptr<PendingClusterState> _pendingClusterState;
diff --git a/storage/src/vespa/storage/distributor/distributor_node_context.h b/storage/src/vespa/storage/distributor/distributor_node_context.h
index 3cb0f509ea7..805e54342dc 100644
--- a/storage/src/vespa/storage/distributor/distributor_node_context.h
+++ b/storage/src/vespa/storage/distributor/distributor_node_context.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/storage/common/cluster_context.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
#include <cstdint>
namespace document { class BucketIdFactory; }
@@ -20,6 +21,7 @@ public:
virtual const framework::Clock& clock() const noexcept = 0;
virtual const document::BucketIdFactory& bucket_id_factory() const noexcept = 0;
virtual uint16_t node_index() const noexcept = 0;
+ virtual api::StorageMessageAddress node_address(uint16_t node_index) const noexcept = 0;
};
}
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index b4a0b00a2e1..c2ab258be35 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -149,6 +149,7 @@ public:
const vespalib::string * cluster_name_ptr() const noexcept override { return cluster_context().cluster_name_ptr(); }
const document::BucketIdFactory& bucket_id_factory() const noexcept override { return getBucketIdFactory(); }
uint16_t node_index() const noexcept override { return getIndex(); }
+ api::StorageMessageAddress node_address(uint16_t node_index) const noexcept override { return nodeAddress(node_index); }
// Implements DistributorOperationContext
api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); }