summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2020-11-30 14:36:53 +0000
committerGeir Storli <geirst@verizonmedia.com>2020-11-30 14:38:24 +0000
commitb867154afa45266c747c558f578a2cea68dae5ae (patch)
treea7943f638fe5a4da31acae2e6b33c0e46b61bafc
parentbed35264fe263ac9954cf65f258eae2a20d544af (diff)
Add operation context interface to remove coupling between external operations and DistributorComponent.
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/removelocationtest.cpp2
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/statoperationtest.cpp1
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp2
-rw-r--r--storage/src/tests/distributor/visitoroperationtest.cpp1
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp2
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h6
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_operation_context.h50
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/distributorcomponent.h42
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp13
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h3
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp28
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.h7
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h5
30 files changed, 213 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index a69948e6dd7..1515eba4bf0 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -73,6 +73,7 @@ public:
void sendPut(std::shared_ptr<api::PutCommand> msg) {
op = std::make_unique<PutOperation>(getExternalOperationHandler(),
+ getExternalOperationHandler(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp
index 9874e936e62..47b6e436fb1 100644
--- a/storage/src/tests/distributor/removelocationtest.cpp
+++ b/storage/src/tests/distributor/removelocationtest.cpp
@@ -28,6 +28,8 @@ struct RemoveLocationOperationTest : Test, DistributorTestUtil {
op = std::make_unique<RemoveLocationOperation>(
getExternalOperationHandler(),
+ getExternalOperationHandler(),
+ getExternalOperationHandler(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index 02c2771a78c..49d60292326 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -36,6 +36,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil {
op = std::make_unique<RemoveOperation>(
getExternalOperationHandler(),
+ getExternalOperationHandler(),
getDistributorBucketSpace(),
msg,
getDistributor().getMetrics().
diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp
index 53ea0ec5efa..981247c0753 100644
--- a/storage/src/tests/distributor/statoperationtest.cpp
+++ b/storage/src/tests/distributor/statoperationtest.cpp
@@ -32,7 +32,6 @@ TEST_F(StatOperationTest, bucket_info) {
addNodesToBucketDB(document::BucketId(16, 5), "0=4/2/100,1=4/2/100");
StatBucketOperation op(
- getExternalOperationHandler(),
getDistributorBucketSpace(),
std::make_shared<api::StatBucketCommand>(
makeDocumentBucket(document::BucketId(16, 5)), ""));
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 200a98e352f..6e3837afa43 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -327,7 +327,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState,
ExternalOperationHandler& handler = getExternalOperationHandler();
return std::make_shared<TwoPhaseUpdateOperation>(
- handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics());
+ handler, handler, handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics());
}
TEST_F(TwoPhaseUpdateOperationTest, simple) {
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 422baf08465..8c5fb6db378 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -67,7 +67,7 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m
ExternalOperationHandler& handler = getExternalOperationHandler();
return std::make_shared<UpdateOperation>(
- handler, getDistributorBucketSpace(), msg,
+ handler, handler, getDistributorBucketSpace(), msg,
getDistributor().getMetrics().updates);
}
diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp
index df5180bbb7c..0af4aff02d5 100644
--- a/storage/src/tests/distributor/visitoroperationtest.cpp
+++ b/storage/src/tests/distributor/visitoroperationtest.cpp
@@ -103,6 +103,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil {
{
return std::make_unique<VisitorOperation>(
getExternalOperationHandler(),
+ getExternalOperationHandler(),
getDistributorBucketSpace(),
msg,
config,
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index cc04f1d554c..a87977bec11 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_use_weak_internal_read_consistency_for_client_gets(false),
_enable_metadata_only_fetch_phase_for_inconsistent_updates(false),
_prioritize_global_bucket_merges(true),
+ _enable_revert(true),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
@@ -164,6 +165,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist
_use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets;
_enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates;
_prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges;
+ _enable_revert = config.enableRevert;
_minimumReplicaCountingMode = config.minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index 41a30165f49..5ff1a8b3503 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -253,6 +253,9 @@ public:
bool prioritize_global_bucket_merges() const noexcept {
return _prioritize_global_bucket_merges;
}
+ bool enable_revert() const noexcept {
+ return _enable_revert;
+ }
bool containsTimeStatement(const std::string& documentSelection) const;
@@ -303,9 +306,10 @@ private:
bool _use_weak_internal_read_consistency_for_client_gets;
bool _enable_metadata_only_fetch_phase_for_inconsistent_updates;
bool _prioritize_global_bucket_merges;
+ bool _enable_revert;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
-
+
friend struct distributor::DistributorTest;
void configureMaintenancePriorities(
const vespa::config::content::core::StorDistributormanagerConfig&);
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 23c763ce120..005b5771c36 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -68,7 +68,7 @@ public:
}
DistributorMetricSet& getMetrics() override { return *_metrics; }
-
+
PendingMessageTracker& getPendingMessageTracker() override {
return _pendingMessageTracker;
}
diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h
new file mode 100644
index 00000000000..e4832a7db12
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h
@@ -0,0 +1,50 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bucketownership.h"
+#include <vespa/document/bucket/bucketspace.h>
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <vespa/storage/common/distributorcomponent.h>
+#include <vespa/storageapi/defs.h>
+
+namespace document { class Bucket; }
+
+namespace storage { class DistributorConfiguration; }
+
+namespace storage::distributor {
+
+class DistributorBucketSpaceRepo;
+
+/**
+ * Interface with functionality that is used when handling distributor operations.
+ */
+class DistributorOperationContext {
+public:
+ virtual ~DistributorOperationContext() {}
+ virtual api::Timestamp generate_unique_timestamp() = 0;
+ virtual BucketOwnership check_ownership_in_pending_and_current_state(const document::Bucket &bucket) const = 0;
+ virtual void update_bucket_database(const document::Bucket& bucket,
+ const BucketCopy& changed_node,
+ uint32_t update_flags = 0) = 0;
+ virtual void update_bucket_database(const document::Bucket& bucket,
+ const std::vector<BucketCopy>& changed_nodes,
+ uint32_t update_flags = 0) = 0;
+ virtual void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) = 0;
+ virtual const DistributorBucketSpaceRepo& bucket_space_repo() const = 0;
+
+ virtual void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space,
+ const BucketDatabase::Entry& entry,
+ uint8_t pri) = 0;
+ virtual const DistributorConfiguration& distributor_config() const = 0;
+ virtual bool has_pending_message(uint16_t node_index,
+ const document::Bucket& bucket,
+ uint32_t message_type) const = 0;
+ virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0;
+
+ // TODO: Move to being a free function instead.
+ virtual const char* storage_node_up_states() const = 0;
+
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
index 1c5d4506e42..a9247deefe8 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp
@@ -2,6 +2,7 @@
#include "distributorcomponent.h"
#include "distributor_bucket_space_repo.h"
#include "distributor_bucket_space.h"
+#include "pendingmessagetracker.h"
#include <vespa/document/select/parser.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/vdslib/state/cluster_state_bundle.h>
@@ -304,6 +305,15 @@ DistributorComponent::initializing() const {
return _distributor.initializing();
}
+bool
+DistributorComponent::has_pending_message(uint16_t node_index,
+ const document::Bucket& bucket,
+ uint32_t message_type) const
+{
+ const auto& sender = static_cast<const DistributorMessageSender&>(getDistributor());
+ return sender.getPendingMessageTracker().hasPendingMessage(node_index, bucket, message_type);
+}
+
std::unique_ptr<document::select::Node>
DistributorComponent::parse_selection(const vespalib::string& selection) const
{
diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h
index cab89c3fc30..0d0449836bb 100644
--- a/storage/src/vespa/storage/distributor/distributorcomponent.h
+++ b/storage/src/vespa/storage/distributor/distributorcomponent.h
@@ -2,6 +2,7 @@
#pragma once
#include "distributor_node_context.h"
+#include "distributor_operation_context.h"
#include "distributorinterface.h"
#include "document_selection_parser.h"
#include "operationowner.h"
@@ -29,6 +30,7 @@ struct DatabaseUpdate {
*/
class DistributorComponent : public storage::DistributorComponent,
public DistributorNodeContext,
+ public DistributorOperationContext,
public DocumentSelectionParser
{
public:
@@ -179,9 +181,49 @@ public:
const vespalib::string& cluster_name() const noexcept override { return getClusterName(); }
uint16_t node_index() const noexcept override { return getIndex(); }
+ // Implements DistributorOperationContext
+ api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); }
+ BucketOwnership check_ownership_in_pending_and_current_state(const document::Bucket &bucket) const override {
+ return checkOwnershipInPendingAndCurrentState(bucket);
+ }
+ void update_bucket_database(const document::Bucket& bucket,
+ const BucketCopy& changed_node,
+ uint32_t update_flags = 0) override {
+ updateBucketDatabase(bucket, changed_node, update_flags);
+ }
+ virtual void update_bucket_database(const document::Bucket& bucket,
+ const std::vector<BucketCopy>& changed_nodes,
+ uint32_t update_flags = 0) override {
+ updateBucketDatabase(bucket, changed_nodes, update_flags);
+ }
+ void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) override {
+ removeNodeFromDB(bucket, node_index);
+ }
+ const DistributorBucketSpaceRepo& bucket_space_repo() const override {
+ return getBucketSpaceRepo();
+ }
+ void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space,
+ const BucketDatabase::Entry& entry,
+ uint8_t pri) override {
+ getDistributor().checkBucketForSplit(bucket_space, entry, pri);
+ }
+ const DistributorConfiguration& distributor_config() const override {
+ return getDistributor().getConfig();
+ }
+ bool has_pending_message(uint16_t node_index,
+ const document::Bucket& bucket,
+ uint32_t message_type) const override;
+ const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override {
+ return getDistributor().pendingClusterStateOrNull(bucket_space);
+ }
+ const char* storage_node_up_states() const override {
+ return getDistributor().getStorageNodeUpStates();
+ }
+
// Implements DocumentSelectionParser
std::unique_ptr<document::select::Node> parse_selection(const vespalib::string& selection) const override;
+
private:
void enumerateUnavailableNodes(
std::vector<uint16_t>& unavailableNodes,
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 3d797fb2e28..e3a6056a723 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -253,7 +253,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put)
auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
if (allowMutation(handle)) {
document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace();
- _op = std::make_shared<PutOperation>(*this,
+ _op = std::make_shared<PutOperation>(*this, *this,
_bucketSpaceRepo.get(bucketSpace),
std::move(cmd), getMetrics().puts, std::move(handle));
} else {
@@ -277,7 +277,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update)
auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
if (allowMutation(handle)) {
document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace();
- _op = std::make_shared<TwoPhaseUpdateOperation>(*this,
+ _op = std::make_shared<TwoPhaseUpdateOperation>(*this, *this, *this,
_bucketSpaceRepo.get(bucketSpace),
std::move(cmd), getMetrics(), std::move(handle));
} else {
@@ -302,7 +302,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
if (allowMutation(handle)) {
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = std::make_shared<RemoveOperation>(*this, distributorBucketSpace, std::move(cmd),
+ _op = std::make_shared<RemoveOperation>(*this, *this, distributorBucketSpace, std::move(cmd),
getMetrics().removes, std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
@@ -322,7 +322,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
return true;
}
- _op = std::make_shared<RemoveLocationOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ _op = std::make_shared<RemoveLocationOperation>(*this, *this, *this,
+ _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
std::move(cmd), getMetrics().removelocations);
return true;
}
@@ -367,7 +368,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
auto& metrics = getMetrics().stats;
bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) {
auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace());
- _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd);
+ _op = std::make_shared<StatBucketOperation>(bucket_space, cmd);
});
return true;
}
@@ -389,7 +390,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
const DistributorConfiguration& config(getDistributor().getConfig());
VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = Operation::SP(new VisitorOperation(*this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits));
+ _op = Operation::SP(new VisitorOperation(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits));
return true;
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index e9348e8e8e1..8a4f527c43a 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -19,14 +19,16 @@ using namespace storage::distributor;
using namespace storage;
using document::BucketSpace;
-PutOperation::PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
+PutOperation::PutOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::PutCommand> msg,
PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
- _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), manager, msg->getTimestamp()),
+ _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(std::move(msg)),
- _manager(manager),
+ _op_ctx(op_ctx),
_bucketSpace(bucketSpace)
{
}
@@ -97,8 +99,8 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi
// Copy is inserted with timestamp 0 such that any actual bucket info
// subsequently arriving from the storage node will always overwrite it.
BucketCopy copy(BucketCopy::recentlyCreatedCopy(0, copies[i].getNode().getIndex()));
- _manager.updateBucketDatabase(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy,
- DatabaseUpdate::CREATE_IF_NONEXISTING);
+ _op_ctx.update_bucket_database(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy,
+ DatabaseUpdate::CREATE_IF_NONEXISTING);
}
ActiveList active;
if (setOneActive) {
@@ -147,11 +149,11 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
}
bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
- auto* pending_state = _manager.getDistributor().pendingClusterStateOrNull(_msg->getBucket().getBucketSpace());
+ auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace());
if (!pending_state) {
return false;
}
- const char* up_states = _manager.getDistributor().getStorageNodeUpStates();
+ const char* up_states = _op_ctx.storage_node_up_states();
return std::any_of(targets.begin(), targets.end(), [pending_state, up_states](const auto& target){
return !pending_state->getNodeState(target.getNode()).getState().oneOf(up_states);
});
@@ -171,7 +173,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
bool up = false;
for (uint16_t i = 0; i < systemState.getNodeCount(lib::NodeType::STORAGE); i++) {
if (systemState.getNodeState(lib::Node(lib::NodeType::STORAGE, i))
- .getState().oneOf(_manager.getDistributor().getStorageNodeUpStates()))
+ .getState().oneOf(_op_ctx.storage_node_up_states()))
{
up = true;
}
@@ -184,15 +186,14 @@ PutOperation::onStart(DistributorMessageSender& sender)
idealNodeCalculator.setDistribution(_bucketSpace.getDistribution());
idealNodeCalculator.setClusterState(_bucketSpace.getClusterState());
OperationTargetResolverImpl targetResolver(_bucketSpace.getBucketDatabase(), idealNodeCalculator,
- _manager.getDistributor().getConfig().getMinimalBucketSplit(),
+ _op_ctx.distributor_config().getMinimalBucketSplit(),
_bucketSpace.getDistribution().getRedundancy(),
_msg->getBucket().getBucketSpace());
OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid));
for (size_t i = 0; i < targets.size(); ++i) {
- if (_manager.getDistributor().getPendingMessageTracker().
- hasPendingMessage(targets[i].getNode().getIndex(), targets[i].getBucket(),
- api::MessageType::DELETEBUCKET_ID))
+ if (_op_ctx.has_pending_message(targets[i].getNode().getIndex(), targets[i].getBucket(),
+ api::MessageType::DELETEBUCKET_ID))
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
"Bucket was being deleted while we got a PUT, failing operation to be safe"));
@@ -242,7 +243,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
// Check whether buckets are large enough to be split.
// TODO(vekterli): only check entries for sendToExisting?
for (uint32_t i = 0; i < entries.size(); ++i) {
- _manager.getDistributor().checkBucketForSplit(_msg->getBucket().getBucketSpace(),
+ _op_ctx.send_inline_split_if_bucket_too_large(_msg->getBucket().getBucketSpace(),
entries[i], _msg->getPriority());
}
@@ -259,7 +260,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
bool
PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
- const auto& config(_manager.getDistributor().getConfig());
+ const auto& config(_op_ctx.distributor_config());
if (config.isBucketActivationDisabled()) {
return false;
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 61149839ed1..6503a5ce438 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -23,8 +23,11 @@ class OperationTargetList;
class PutOperation : public SequencedOperation
{
public:
- PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
- std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric,
+ PutOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<api::PutCommand> msg,
+ PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
void onStart(DistributorMessageSender& sender) override;
@@ -51,7 +54,7 @@ private:
bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
std::shared_ptr<api::PutCommand> _msg;
- DistributorComponent& _manager;
+ DistributorOperationContext& _op_ctx;
DistributorBucketSpace &_bucketSpace;
};
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index da58792642b..c0d06171320 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -16,19 +16,22 @@ using namespace storage;
using document::BucketSpace;
RemoveLocationOperation::RemoveLocationOperation(
- DistributorComponent& manager,
+ DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DocumentSelectionParser& parser,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::RemoveLocationCommand> msg,
PersistenceOperationMetricSet& metric)
: Operation(),
_trackerInstance(metric,
std::make_shared<api::RemoveLocationReply>(*msg),
- manager,
+ node_ctx,
+ op_ctx,
0),
_tracker(_trackerInstance),
_msg(std::move(msg)),
- _node_ctx(manager),
- _parser(manager),
+ _node_ctx(node_ctx),
+ _parser(parser),
_bucketSpace(bucketSpace)
{}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
index 66df9f2f06b..5cad43e02ca 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h
@@ -15,7 +15,9 @@ class DistributorBucketSpace;
class RemoveLocationOperation : public Operation
{
public:
- RemoveLocationOperation(DistributorComponent& manager,
+ RemoveLocationOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DocumentSelectionParser& parser,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::RemoveLocationCommand> msg,
PersistenceOperationMetricSet& metric);
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 75163b73c7a..6a7ea8b8f89 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -11,7 +11,8 @@ using namespace storage::distributor;
using namespace storage;
using document::BucketSpace;
-RemoveOperation::RemoveOperation(DistributorComponent& manager,
+RemoveOperation::RemoveOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::RemoveCommand> msg,
PersistenceOperationMetricSet& metric,
@@ -19,10 +20,10 @@ RemoveOperation::RemoveOperation(DistributorComponent& manager,
: SequencedOperation(std::move(sequencingHandle)),
_trackerInstance(metric,
std::make_shared<api::RemoveReply>(*msg),
- manager, msg->getTimestamp()),
+ node_ctx, op_ctx, msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(std::move(msg)),
- _node_ctx(manager),
+ _node_ctx(node_ctx),
_bucketSpace(bucketSpace)
{
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 486866e4482..8dc88cb1d65 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -15,7 +15,8 @@ class DistributorBucketSpace;
class RemoveOperation : public SequencedOperation
{
public:
- RemoveOperation(DistributorComponent& manager,
+ RemoveOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::RemoveCommand> msg,
PersistenceOperationMetricSet& metric,
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
index 60c1137bd6d..ec8e4539ad1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
@@ -11,7 +11,6 @@ LOG_SETUP(".distributor.callback.statbucket");
namespace storage::distributor {
StatBucketOperation::StatBucketOperation(
- [[maybe_unused]] DistributorComponent& manager,
DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::StatBucketCommand> & cmd)
: Operation(),
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
index 914e104943a..beb9e9c3445 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h
@@ -14,13 +14,12 @@ namespace storage::api { class StatBucketCommand; }
namespace storage::distributor {
-class DistributorComponent;
class DistributorBucketSpace;
class StatBucketOperation : public Operation
{
public:
- StatBucketOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
+ StatBucketOperation(DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::StatBucketCommand> & cmd);
~StatBucketOperation();
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 951d5010954..72193c99a64 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -20,8 +20,12 @@ using document::BucketSpace;
namespace storage::distributor {
+
+
TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
- DistributorComponent& manager,
+ DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DocumentSelectionParser& parser,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::UpdateCommand> msg,
DistributorMetricSet& metrics,
@@ -33,9 +37,9 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_metadata_get_metrics(metrics.update_metadata_gets),
_updateCmd(std::move(msg)),
_updateReply(),
- _manager(manager),
- _node_ctx(manager),
- _parser(manager),
+ _node_ctx(node_ctx),
+ _op_ctx(op_ctx),
+ _parser(parser),
_bucketSpace(bucketSpace),
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
@@ -43,7 +47,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_single_get_latency_timer(),
_fast_path_repair_source_node(0xffff),
_use_initial_cheap_metadata_fetch_phase(
- _manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()),
+ _op_ctx.distributor_config().enable_metadata_only_fetch_phase_for_inconsistent_updates()),
_replySent(false)
{
document::BucketIdFactory idFactory;
@@ -168,7 +172,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str());
- auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
+ auto updateOperation = std::make_shared<UpdateOperation>(_node_ctx, _op_ctx, _bucketSpace, _updateCmd, _updateMetric);
UpdateOperation & op = *updateOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
op.start(intermediate, _node_ctx.clock().getTimeInMillis());
@@ -223,7 +227,7 @@ TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() {
update_doc_id().c_str(), field_set, api::to_string(read_consistency));
auto& get_metric = (_use_initial_cheap_metadata_fetch_phase ? _metadata_get_metrics : _getMetric);
return std::make_shared<GetOperation>(
- _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(),
+ _node_ctx, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(),
get, get_metric, read_consistency);
}
@@ -248,7 +252,7 @@ bool
TwoPhaseUpdateOperation::lostBucketOwnershipBetweenPhases() const
{
document::Bucket updateDocBucket(_updateCmd->getBucket().getBucketSpace(), _updateDocBucketId);
- BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState(updateDocBucket));
+ BucketOwnership bo(_op_ctx.check_ownership_in_pending_and_current_state(updateDocBucket));
return !bo.isOwned();
}
@@ -273,7 +277,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp);
copyMessageSettings(*_updateCmd, *put);
- auto putOperation = std::make_shared<PutOperation>(_manager, _bucketSpace, std::move(put), _putMetric);
+ auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric);
PutOperation & op = *putOperation;
IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender);
op.start(intermediate, _node_ctx.clock().getTimeInMillis());
@@ -319,7 +323,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, ""));
return;
}
- schedulePutsWithUpdatedDocument(getReply.getDocument(), _manager.getUniqueTimestamp(), sender);
+ schedulePutsWithUpdatedDocument(getReply.getDocument(), _op_ctx.generate_unique_timestamp(), sender);
return;
}
@@ -499,7 +503,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
}
document::Document::SP docToUpdate;
- api::Timestamp putTimestamp = _manager.getUniqueTimestamp();
+ api::Timestamp putTimestamp = _op_ctx.generate_unique_timestamp();
if (reply.getDocument().get()) {
api::Timestamp receivedTimestamp = reply.getLastModifiedTimestamp();
@@ -534,7 +538,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen
}
bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) {
- return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() &&
+ return (_op_ctx.distributor_config().update_fast_path_restart_enabled() &&
!_replicas_at_get_send_time.empty() && // To ensure we send CreateBucket+Put if no replicas exist.
reply.had_consistent_replicas() &&
replica_set_unchanged_after_get_operation());
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 5a936405055..330609b276b 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -55,7 +55,9 @@ class GetOperation;
class TwoPhaseUpdateOperation : public SequencedOperation
{
public:
- TwoPhaseUpdateOperation(DistributorComponent& manager,
+ TwoPhaseUpdateOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
+ DocumentSelectionParser& parser,
DistributorBucketSpace &bucketSpace,
std::shared_ptr<api::UpdateCommand> msg,
DistributorMetricSet& metrics,
@@ -141,8 +143,8 @@ private:
PersistenceOperationMetricSet& _metadata_get_metrics;
std::shared_ptr<api::UpdateCommand> _updateCmd;
std::shared_ptr<api::StorageReply> _updateReply;
- DistributorComponent& _manager;
DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
DocumentSelectionParser& _parser;
DistributorBucketSpace &_bucketSpace;
SentMessageMap _sentMessageMap;
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 258b95c4952..1fd2ea90258 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -17,19 +17,19 @@ using document::BucketSpace;
namespace storage::distributor {
-UpdateOperation::UpdateOperation(DistributorComponent& manager,
+UpdateOperation::UpdateOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
UpdateMetricSet& metric)
: Operation(),
_trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg),
- manager, msg->getTimestamp()),
+ node_ctx, op_ctx, msg->getTimestamp()),
_tracker(_trackerInstance),
_msg(msg),
_new_timestamp(_msg->getTimestamp()),
_is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()),
- _manager(manager),
- _node_ctx(manager),
+ _node_ctx(node_ctx),
_bucketSpace(bucketSpace),
_newestTimestampLocation(),
_infoAtSendTime(),
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
index 6fb6e71a68e..3db9979aa13 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h
@@ -23,7 +23,8 @@ class DistributorBucketSpace;
class UpdateOperation : public Operation
{
public:
- UpdateOperation(DistributorComponent& manager,
+ UpdateOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::UpdateCommand> & msg,
UpdateMetricSet& metric);
@@ -45,7 +46,6 @@ private:
const api::Timestamp _new_timestamp;
const bool _is_auto_create_update;
- DistributorComponent& _manager;
DistributorNodeContext& _node_ctx;
DistributorBucketSpace &_bucketSpace;
std::pair<document::BucketId, uint16_t> _newestTimestampLocation;
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index 9d0ca12ac5c..c35a6671c8d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -44,14 +44,15 @@ VisitorOperation::BucketInfo::toString() const
VisitorOperation::SuperBucketInfo::~SuperBucketInfo() = default;
VisitorOperation::VisitorOperation(
- DistributorComponent& owner,
+ DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
const api::CreateVisitorCommand::SP& m,
const Config& config,
VisitorMetricSet& metrics)
: Operation(),
- _owner(owner),
- _node_ctx(owner),
+ _node_ctx(node_ctx),
+ _op_ctx(op_ctx),
_bucketSpace(bucketSpace),
_msg(m),
_sentReply(false),
@@ -73,7 +74,7 @@ VisitorOperation::VisitorOperation(
_fromTime = m->getFromTime();
_toTime = m->getToTime();
if (_toTime == 0) {
- _toTime = owner.getUniqueTimestamp();
+ _toTime = _op_ctx.generate_unique_timestamp();
}
}
@@ -276,7 +277,7 @@ void
VisitorOperation::verifyDistributorOwnsBucket(const document::BucketId& bid)
{
document::Bucket bucket(_msg->getBucketSpace(), bid);
- BucketOwnership bo(_owner.checkOwnershipInPendingAndCurrentState(bucket));
+ BucketOwnership bo(_op_ctx.check_ownership_in_pending_and_current_state(bucket));
if (!bo.isOwned()) {
verifyDistributorIsNotDown(bo.getNonOwnedState());
std::string systemStateStr = bo.getNonOwnedState().toString();
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
index 993c0c0808d..28be7dfe353 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h
@@ -16,9 +16,9 @@ namespace storage::lib { class ClusterState; }
namespace storage::distributor {
-class DistributorComponent;
class DistributorBucketSpace;
class DistributorNodeContext;
+class DistributorOperationContext;
class VisitorOperation : public Operation
{
@@ -33,7 +33,8 @@ public:
uint32_t maxVisitorsPerNodePerVisitor;
};
- VisitorOperation(DistributorComponent& manager,
+ VisitorOperation(DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
DistributorBucketSpace &bucketSpace,
const std::shared_ptr<api::CreateVisitorCommand> & msg,
const Config& config,
@@ -140,8 +141,8 @@ private:
*/
vespalib::duration timeLeft() const noexcept;
- DistributorComponent& _owner;
DistributorNodeContext& _node_ctx;
+ DistributorOperationContext& _op_ctx;
DistributorBucketSpace &_bucketSpace;
SentMessagesMap _sentMessages;
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 07342dc3e7f..9e064087c08 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -15,15 +15,16 @@ namespace storage::distributor {
PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl(
PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
- DistributorComponent& link,
+ DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
api::Timestamp revertTimestamp)
- : MessageTracker(link.cluster_name()),
+ : MessageTracker(node_ctx.cluster_name()),
_metric(metric),
_reply(std::move(reply)),
- _manager(link),
+ _op_ctx(op_ctx),
_revertTimestamp(revertTimestamp),
_trace(_reply->getTrace().getLevel()),
- _requestTimer(link.clock()),
+ _requestTimer(node_ctx.clock()),
_n_persistence_replies_total(0),
_n_successful_persistence_replies(0),
_priority(_reply->getPriority()),
@@ -37,11 +38,11 @@ void
PersistenceMessageTrackerImpl::updateDB()
{
for (const auto & entry : _bucketInfo) {
- _manager.updateBucketDatabase(entry.first, entry.second);
+ _op_ctx.update_bucket_database(entry.first, entry.second);
}
for (const auto & entry : _remapBucketInfo){
- _manager.updateBucketDatabase(entry.first, entry.second,DatabaseUpdate::CREATE_IF_NONEXISTING);
+ _op_ctx.update_bucket_database(entry.first, entry.second, DatabaseUpdate::CREATE_IF_NONEXISTING);
}
}
@@ -119,7 +120,7 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const
LOG(spam, "Can't return early because we have already replied or failed");
return false;
}
- auto &bucketSpaceRepo(_manager.getBucketSpaceRepo());
+ auto &bucketSpaceRepo(_op_ctx.bucket_space_repo());
auto &bucketSpace(bucketSpaceRepo.get(_reply->getBucket().getBucketSpace()));
const lib::Distribution& distribution = bucketSpace.getDistribution();
@@ -164,13 +165,13 @@ PersistenceMessageTrackerImpl::addBucketInfoFromReply(
bucket.toString().c_str(),
bucketInfo.toString().c_str(),
node);
- _remapBucketInfo[bucket].emplace_back(_manager.getUniqueTimestamp(), node, bucketInfo);
+ _remapBucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo);
} else {
LOG(debug, "Bucket %s: Received bucket info %s from node %d",
bucket.toString().c_str(),
bucketInfo.toString().c_str(),
node);
- _bucketInfo[bucket].emplace_back(_manager.getUniqueTimestamp(), node, bucketInfo);
+ _bucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo);
}
}
@@ -195,7 +196,7 @@ PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::Buck
bool
PersistenceMessageTrackerImpl::shouldRevert() const
{
- return _manager.getDistributorConfig().enableRevert
+ return _op_ctx.distributor_config().enable_revert()
&& !_revertNodes.empty() && !_success && _reply;
}
@@ -258,7 +259,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply(
&& reply.getResult().getResult() != api::ReturnCode::EXISTS)
{
LOG(spam, "Create bucket reply failed, so deleting it from bucket db");
- _manager.removeNodeFromDB(reply.getBucket(), node);
+ _op_ctx.remove_node_from_bucket_database(reply.getBucket(), node);
}
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index cf9f4017eda..1421b4c2038 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -36,7 +36,8 @@ private:
public:
PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric,
std::shared_ptr<api::BucketInfoReply> reply,
- DistributorComponent&,
+ DistributorNodeContext& node_ctx,
+ DistributorOperationContext& op_ctx,
api::Timestamp revertTimestamp = 0);
~PersistenceMessageTrackerImpl() override;
@@ -70,7 +71,7 @@ private:
PersistenceOperationMetricSet& _metric;
std::shared_ptr<api::BucketInfoReply> _reply;
- DistributorComponent& _manager;
+ DistributorOperationContext& _op_ctx;
api::Timestamp _revertTimestamp;
std::vector<BucketNodePair> _revertNodes;
mbus::Trace _trace;