diff options
author | Geir Storli <geirst@verizonmedia.com> | 2020-11-30 14:36:53 +0000 |
---|---|---|
committer | Geir Storli <geirst@verizonmedia.com> | 2020-11-30 14:38:24 +0000 |
commit | b867154afa45266c747c558f578a2cea68dae5ae (patch) | |
tree | a7943f638fe5a4da31acae2e6b33c0e46b61bafc | |
parent | bed35264fe263ac9954cf65f258eae2a20d544af (diff) |
Add operation context interface to remove coupling between external operations and DistributorComponent.
30 files changed, 213 insertions, 81 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index a69948e6dd7..1515eba4bf0 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -73,6 +73,7 @@ public: void sendPut(std::shared_ptr<api::PutCommand> msg) { op = std::make_unique<PutOperation>(getExternalOperationHandler(), + getExternalOperationHandler(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). diff --git a/storage/src/tests/distributor/removelocationtest.cpp b/storage/src/tests/distributor/removelocationtest.cpp index 9874e936e62..47b6e436fb1 100644 --- a/storage/src/tests/distributor/removelocationtest.cpp +++ b/storage/src/tests/distributor/removelocationtest.cpp @@ -28,6 +28,8 @@ struct RemoveLocationOperationTest : Test, DistributorTestUtil { op = std::make_unique<RemoveLocationOperation>( getExternalOperationHandler(), + getExternalOperationHandler(), + getExternalOperationHandler(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index 02c2771a78c..49d60292326 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -36,6 +36,7 @@ struct RemoveOperationTest : Test, DistributorTestUtil { op = std::make_unique<RemoveOperation>( getExternalOperationHandler(), + getExternalOperationHandler(), getDistributorBucketSpace(), msg, getDistributor().getMetrics(). diff --git a/storage/src/tests/distributor/statoperationtest.cpp b/storage/src/tests/distributor/statoperationtest.cpp index 53ea0ec5efa..981247c0753 100644 --- a/storage/src/tests/distributor/statoperationtest.cpp +++ b/storage/src/tests/distributor/statoperationtest.cpp @@ -32,7 +32,6 @@ TEST_F(StatOperationTest, bucket_info) { addNodesToBucketDB(document::BucketId(16, 5), "0=4/2/100,1=4/2/100"); StatBucketOperation op( - getExternalOperationHandler(), getDistributorBucketSpace(), std::make_shared<api::StatBucketCommand>( makeDocumentBucket(document::BucketId(16, 5)), "")); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 200a98e352f..6e3837afa43 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -327,7 +327,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, ExternalOperationHandler& handler = getExternalOperationHandler(); return std::make_shared<TwoPhaseUpdateOperation>( - handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); + handler, handler, handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics()); } TEST_F(TwoPhaseUpdateOperationTest, simple) { diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 422baf08465..8c5fb6db378 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -67,7 +67,7 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m ExternalOperationHandler& handler = getExternalOperationHandler(); return std::make_shared<UpdateOperation>( - handler, getDistributorBucketSpace(), msg, + handler, handler, getDistributorBucketSpace(), msg, getDistributor().getMetrics().updates); } diff --git a/storage/src/tests/distributor/visitoroperationtest.cpp b/storage/src/tests/distributor/visitoroperationtest.cpp index df5180bbb7c..0af4aff02d5 100644 --- a/storage/src/tests/distributor/visitoroperationtest.cpp +++ b/storage/src/tests/distributor/visitoroperationtest.cpp @@ -103,6 +103,7 @@ struct VisitorOperationTest : Test, DistributorTestUtil { { return std::make_unique<VisitorOperation>( getExternalOperationHandler(), + getExternalOperationHandler(), getDistributorBucketSpace(), msg, config, diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index cc04f1d554c..a87977bec11 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -46,6 +46,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _use_weak_internal_read_consistency_for_client_gets(false), _enable_metadata_only_fetch_phase_for_inconsistent_updates(false), _prioritize_global_bucket_merges(true), + _enable_revert(true), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -164,6 +165,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _use_weak_internal_read_consistency_for_client_gets = config.useWeakInternalReadConsistencyForClientGets; _enable_metadata_only_fetch_phase_for_inconsistent_updates = config.enableMetadataOnlyFetchPhaseForInconsistentUpdates; _prioritize_global_bucket_merges = config.prioritizeGlobalBucketMerges; + _enable_revert = config.enableRevert; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 41a30165f49..5ff1a8b3503 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -253,6 +253,9 @@ public: bool prioritize_global_bucket_merges() const noexcept { return _prioritize_global_bucket_merges; } + bool enable_revert() const noexcept { + return _enable_revert; + } bool containsTimeStatement(const std::string& documentSelection) const; @@ -303,9 +306,10 @@ private: bool _use_weak_internal_read_consistency_for_client_gets; bool _enable_metadata_only_fetch_phase_for_inconsistent_updates; bool _prioritize_global_bucket_merges; + bool _enable_revert; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; - + friend struct distributor::DistributorTest; void configureMaintenancePriorities( const vespa::config::content::core::StorDistributormanagerConfig&); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 23c763ce120..005b5771c36 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -68,7 +68,7 @@ public: } DistributorMetricSet& getMetrics() override { return *_metrics; } - + PendingMessageTracker& getPendingMessageTracker() override { return _pendingMessageTracker; } diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h new file mode 100644 index 00000000000..e4832a7db12 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -0,0 +1,50 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucketownership.h" +#include <vespa/document/bucket/bucketspace.h> +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <vespa/storage/common/distributorcomponent.h> +#include <vespa/storageapi/defs.h> + +namespace document { class Bucket; } + +namespace storage { class DistributorConfiguration; } + +namespace storage::distributor { + +class DistributorBucketSpaceRepo; + +/** + * Interface with functionality that is used when handling distributor operations. + */ +class DistributorOperationContext { +public: + virtual ~DistributorOperationContext() {} + virtual api::Timestamp generate_unique_timestamp() = 0; + virtual BucketOwnership check_ownership_in_pending_and_current_state(const document::Bucket &bucket) const = 0; + virtual void update_bucket_database(const document::Bucket& bucket, + const BucketCopy& changed_node, + uint32_t update_flags = 0) = 0; + virtual void update_bucket_database(const document::Bucket& bucket, + const std::vector<BucketCopy>& changed_nodes, + uint32_t update_flags = 0) = 0; + virtual void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) = 0; + virtual const DistributorBucketSpaceRepo& bucket_space_repo() const = 0; + + virtual void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space, + const BucketDatabase::Entry& entry, + uint8_t pri) = 0; + virtual const DistributorConfiguration& distributor_config() const = 0; + virtual bool has_pending_message(uint16_t node_index, + const document::Bucket& bucket, + uint32_t message_type) const = 0; + virtual const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const = 0; + + // TODO: Move to being a free function instead. + virtual const char* storage_node_up_states() const = 0; + +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index 1c5d4506e42..a9247deefe8 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -2,6 +2,7 @@ #include "distributorcomponent.h" #include "distributor_bucket_space_repo.h" #include "distributor_bucket_space.h" +#include "pendingmessagetracker.h" #include <vespa/document/select/parser.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vdslib/state/cluster_state_bundle.h> @@ -304,6 +305,15 @@ DistributorComponent::initializing() const { return _distributor.initializing(); } +bool +DistributorComponent::has_pending_message(uint16_t node_index, + const document::Bucket& bucket, + uint32_t message_type) const +{ + const auto& sender = static_cast<const DistributorMessageSender&>(getDistributor()); + return sender.getPendingMessageTracker().hasPendingMessage(node_index, bucket, message_type); +} + std::unique_ptr<document::select::Node> DistributorComponent::parse_selection(const vespalib::string& selection) const { diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index cab89c3fc30..0d0449836bb 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -2,6 +2,7 @@ #pragma once #include "distributor_node_context.h" +#include "distributor_operation_context.h" #include "distributorinterface.h" #include "document_selection_parser.h" #include "operationowner.h" @@ -29,6 +30,7 @@ struct DatabaseUpdate { */ class DistributorComponent : public storage::DistributorComponent, public DistributorNodeContext, + public DistributorOperationContext, public DocumentSelectionParser { public: @@ -179,9 +181,49 @@ public: const vespalib::string& cluster_name() const noexcept override { return getClusterName(); } uint16_t node_index() const noexcept override { return getIndex(); } + // Implements DistributorOperationContext + api::Timestamp generate_unique_timestamp() override { return getUniqueTimestamp(); } + BucketOwnership check_ownership_in_pending_and_current_state(const document::Bucket &bucket) const override { + return checkOwnershipInPendingAndCurrentState(bucket); + } + void update_bucket_database(const document::Bucket& bucket, + const BucketCopy& changed_node, + uint32_t update_flags = 0) override { + updateBucketDatabase(bucket, changed_node, update_flags); + } + virtual void update_bucket_database(const document::Bucket& bucket, + const std::vector<BucketCopy>& changed_nodes, + uint32_t update_flags = 0) override { + updateBucketDatabase(bucket, changed_nodes, update_flags); + } + void remove_node_from_bucket_database(const document::Bucket& bucket, uint16_t node_index) override { + removeNodeFromDB(bucket, node_index); + } + const DistributorBucketSpaceRepo& bucket_space_repo() const override { + return getBucketSpaceRepo(); + } + void send_inline_split_if_bucket_too_large(document::BucketSpace bucket_space, + const BucketDatabase::Entry& entry, + uint8_t pri) override { + getDistributor().checkBucketForSplit(bucket_space, entry, pri); + } + const DistributorConfiguration& distributor_config() const override { + return getDistributor().getConfig(); + } + bool has_pending_message(uint16_t node_index, + const document::Bucket& bucket, + uint32_t message_type) const override; + const lib::ClusterState* pending_cluster_state_or_null(const document::BucketSpace& bucket_space) const override { + return getDistributor().pendingClusterStateOrNull(bucket_space); + } + const char* storage_node_up_states() const override { + return getDistributor().getStorageNodeUpStates(); + } + // Implements DocumentSelectionParser std::unique_ptr<document::select::Node> parse_selection(const vespalib::string& selection) const override; + private: void enumerateUnavailableNodes( std::vector<uint16_t>& unavailableNodes, diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 3d797fb2e28..e3a6056a723 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -253,7 +253,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace(); - _op = std::make_shared<PutOperation>(*this, + _op = std::make_shared<PutOperation>(*this, *this, _bucketSpaceRepo.get(bucketSpace), std::move(cmd), getMetrics().puts, std::move(handle)); } else { @@ -277,7 +277,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Update) auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { document::BucketSpace bucketSpace = cmd->getBucket().getBucketSpace(); - _op = std::make_shared<TwoPhaseUpdateOperation>(*this, + _op = std::make_shared<TwoPhaseUpdateOperation>(*this, *this, *this, _bucketSpaceRepo.get(bucketSpace), std::move(cmd), getMetrics(), std::move(handle)); } else { @@ -302,7 +302,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) if (allowMutation(handle)) { auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = std::make_shared<RemoveOperation>(*this, distributorBucketSpace, std::move(cmd), + _op = std::make_shared<RemoveOperation>(*this, *this, distributorBucketSpace, std::move(cmd), getMetrics().removes, std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); @@ -322,7 +322,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) return true; } - _op = std::make_shared<RemoveLocationOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + _op = std::make_shared<RemoveLocationOperation>(*this, *this, *this, + _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), std::move(cmd), getMetrics().removelocations); return true; } @@ -367,7 +368,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) auto& metrics = getMetrics().stats; bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); - _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd); + _op = std::make_shared<StatBucketOperation>(bucket_space, cmd); }); return true; } @@ -389,7 +390,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) const DistributorConfiguration& config(getDistributor().getConfig()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = Operation::SP(new VisitorOperation(*this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits)); + _op = Operation::SP(new VisitorOperation(*this, *this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits)); return true; } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index e9348e8e8e1..8a4f527c43a 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -19,14 +19,16 @@ using namespace storage::distributor; using namespace storage; using document::BucketSpace; -PutOperation::PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, +PutOperation::PutOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DistributorBucketSpace &bucketSpace, std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) : SequencedOperation(std::move(sequencingHandle)), - _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), manager, msg->getTimestamp()), + _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), _tracker(_trackerInstance), _msg(std::move(msg)), - _manager(manager), + _op_ctx(op_ctx), _bucketSpace(bucketSpace) { } @@ -97,8 +99,8 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi // Copy is inserted with timestamp 0 such that any actual bucket info // subsequently arriving from the storage node will always overwrite it. BucketCopy copy(BucketCopy::recentlyCreatedCopy(0, copies[i].getNode().getIndex())); - _manager.updateBucketDatabase(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy, - DatabaseUpdate::CREATE_IF_NONEXISTING); + _op_ctx.update_bucket_database(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy, + DatabaseUpdate::CREATE_IF_NONEXISTING); } ActiveList active; if (setOneActive) { @@ -147,11 +149,11 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc } bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const { - auto* pending_state = _manager.getDistributor().pendingClusterStateOrNull(_msg->getBucket().getBucketSpace()); + auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace()); if (!pending_state) { return false; } - const char* up_states = _manager.getDistributor().getStorageNodeUpStates(); + const char* up_states = _op_ctx.storage_node_up_states(); return std::any_of(targets.begin(), targets.end(), [pending_state, up_states](const auto& target){ return !pending_state->getNodeState(target.getNode()).getState().oneOf(up_states); }); @@ -171,7 +173,7 @@ PutOperation::onStart(DistributorMessageSender& sender) bool up = false; for (uint16_t i = 0; i < systemState.getNodeCount(lib::NodeType::STORAGE); i++) { if (systemState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)) - .getState().oneOf(_manager.getDistributor().getStorageNodeUpStates())) + .getState().oneOf(_op_ctx.storage_node_up_states())) { up = true; } @@ -184,15 +186,14 @@ PutOperation::onStart(DistributorMessageSender& sender) idealNodeCalculator.setDistribution(_bucketSpace.getDistribution()); idealNodeCalculator.setClusterState(_bucketSpace.getClusterState()); OperationTargetResolverImpl targetResolver(_bucketSpace.getBucketDatabase(), idealNodeCalculator, - _manager.getDistributor().getConfig().getMinimalBucketSplit(), + _op_ctx.distributor_config().getMinimalBucketSplit(), _bucketSpace.getDistribution().getRedundancy(), _msg->getBucket().getBucketSpace()); OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid)); for (size_t i = 0; i < targets.size(); ++i) { - if (_manager.getDistributor().getPendingMessageTracker(). - hasPendingMessage(targets[i].getNode().getIndex(), targets[i].getBucket(), - api::MessageType::DELETEBUCKET_ID)) + if (_op_ctx.has_pending_message(targets[i].getNode().getIndex(), targets[i].getBucket(), + api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, "Bucket was being deleted while we got a PUT, failing operation to be safe")); @@ -242,7 +243,7 @@ PutOperation::onStart(DistributorMessageSender& sender) // Check whether buckets are large enough to be split. // TODO(vekterli): only check entries for sendToExisting? for (uint32_t i = 0; i < entries.size(); ++i) { - _manager.getDistributor().checkBucketForSplit(_msg->getBucket().getBucketSpace(), + _op_ctx.send_inline_split_if_bucket_too_large(_msg->getBucket().getBucketSpace(), entries[i], _msg->getPriority()); } @@ -259,7 +260,7 @@ PutOperation::onStart(DistributorMessageSender& sender) bool PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const { - const auto& config(_manager.getDistributor().getConfig()); + const auto& config(_op_ctx.distributor_config()); if (config.isBucketActivationDisabled()) { return false; } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 61149839ed1..6503a5ce438 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -23,8 +23,11 @@ class OperationTargetList; class PutOperation : public SequencedOperation { public: - PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, - std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, + PutOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DistributorBucketSpace &bucketSpace, + std::shared_ptr<api::PutCommand> msg, + PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); void onStart(DistributorMessageSender& sender) override; @@ -51,7 +54,7 @@ private: bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; std::shared_ptr<api::PutCommand> _msg; - DistributorComponent& _manager; + DistributorOperationContext& _op_ctx; DistributorBucketSpace &_bucketSpace; }; diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index da58792642b..c0d06171320 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -16,19 +16,22 @@ using namespace storage; using document::BucketSpace; RemoveLocationOperation::RemoveLocationOperation( - DistributorComponent& manager, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DocumentSelectionParser& parser, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::RemoveLocationCommand> msg, PersistenceOperationMetricSet& metric) : Operation(), _trackerInstance(metric, std::make_shared<api::RemoveLocationReply>(*msg), - manager, + node_ctx, + op_ctx, 0), _tracker(_trackerInstance), _msg(std::move(msg)), - _node_ctx(manager), - _parser(manager), + _node_ctx(node_ctx), + _parser(parser), _bucketSpace(bucketSpace) {} diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h index 66df9f2f06b..5cad43e02ca 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.h @@ -15,7 +15,9 @@ class DistributorBucketSpace; class RemoveLocationOperation : public Operation { public: - RemoveLocationOperation(DistributorComponent& manager, + RemoveLocationOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DocumentSelectionParser& parser, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::RemoveLocationCommand> msg, PersistenceOperationMetricSet& metric); diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 75163b73c7a..6a7ea8b8f89 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -11,7 +11,8 @@ using namespace storage::distributor; using namespace storage; using document::BucketSpace; -RemoveOperation::RemoveOperation(DistributorComponent& manager, +RemoveOperation::RemoveOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::RemoveCommand> msg, PersistenceOperationMetricSet& metric, @@ -19,10 +20,10 @@ RemoveOperation::RemoveOperation(DistributorComponent& manager, : SequencedOperation(std::move(sequencingHandle)), _trackerInstance(metric, std::make_shared<api::RemoveReply>(*msg), - manager, msg->getTimestamp()), + node_ctx, op_ctx, msg->getTimestamp()), _tracker(_trackerInstance), _msg(std::move(msg)), - _node_ctx(manager), + _node_ctx(node_ctx), _bucketSpace(bucketSpace) { } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 486866e4482..8dc88cb1d65 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -15,7 +15,8 @@ class DistributorBucketSpace; class RemoveOperation : public SequencedOperation { public: - RemoveOperation(DistributorComponent& manager, + RemoveOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::RemoveCommand> msg, PersistenceOperationMetricSet& metric, diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp index 60c1137bd6d..ec8e4539ad1 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp @@ -11,7 +11,6 @@ LOG_SETUP(".distributor.callback.statbucket"); namespace storage::distributor { StatBucketOperation::StatBucketOperation( - [[maybe_unused]] DistributorComponent& manager, DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::StatBucketCommand> & cmd) : Operation(), diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h index 914e104943a..beb9e9c3445 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.h @@ -14,13 +14,12 @@ namespace storage::api { class StatBucketCommand; } namespace storage::distributor { -class DistributorComponent; class DistributorBucketSpace; class StatBucketOperation : public Operation { public: - StatBucketOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, + StatBucketOperation(DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::StatBucketCommand> & cmd); ~StatBucketOperation(); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 951d5010954..72193c99a64 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -20,8 +20,12 @@ using document::BucketSpace; namespace storage::distributor { + + TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( - DistributorComponent& manager, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DocumentSelectionParser& parser, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::UpdateCommand> msg, DistributorMetricSet& metrics, @@ -33,9 +37,9 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _metadata_get_metrics(metrics.update_metadata_gets), _updateCmd(std::move(msg)), _updateReply(), - _manager(manager), - _node_ctx(manager), - _parser(manager), + _node_ctx(node_ctx), + _op_ctx(op_ctx), + _parser(parser), _bucketSpace(bucketSpace), _sendState(SendState::NONE_SENT), _mode(Mode::FAST_PATH), @@ -43,7 +47,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _single_get_latency_timer(), _fast_path_repair_source_node(0xffff), _use_initial_cheap_metadata_fetch_phase( - _manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()), + _op_ctx.distributor_config().enable_metadata_only_fetch_phase_for_inconsistent_updates()), _replySent(false) { document::BucketIdFactory idFactory; @@ -168,7 +172,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str()); - auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric); + auto updateOperation = std::make_shared<UpdateOperation>(_node_ctx, _op_ctx, _bucketSpace, _updateCmd, _updateMetric); UpdateOperation & op = *updateOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); op.start(intermediate, _node_ctx.clock().getTimeInMillis()); @@ -223,7 +227,7 @@ TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() { update_doc_id().c_str(), field_set, api::to_string(read_consistency)); auto& get_metric = (_use_initial_cheap_metadata_fetch_phase ? _metadata_get_metrics : _getMetric); return std::make_shared<GetOperation>( - _manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), + _node_ctx, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(), get, get_metric, read_consistency); } @@ -248,7 +252,7 @@ bool TwoPhaseUpdateOperation::lostBucketOwnershipBetweenPhases() const { document::Bucket updateDocBucket(_updateCmd->getBucket().getBucketSpace(), _updateDocBucketId); - BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState(updateDocBucket)); + BucketOwnership bo(_op_ctx.check_ownership_in_pending_and_current_state(updateDocBucket)); return !bo.isOwned(); } @@ -273,7 +277,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<documen document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp); copyMessageSettings(*_updateCmd, *put); - auto putOperation = std::make_shared<PutOperation>(_manager, _bucketSpace, std::move(put), _putMetric); + auto putOperation = std::make_shared<PutOperation>(_node_ctx, _op_ctx, _bucketSpace, std::move(put), _putMetric); PutOperation & op = *putOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender); op.start(intermediate, _node_ctx.clock().getTimeInMillis()); @@ -319,7 +323,7 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "")); return; } - schedulePutsWithUpdatedDocument(getReply.getDocument(), _manager.getUniqueTimestamp(), sender); + schedulePutsWithUpdatedDocument(getReply.getDocument(), _op_ctx.generate_unique_timestamp(), sender); return; } @@ -499,7 +503,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen } document::Document::SP docToUpdate; - api::Timestamp putTimestamp = _manager.getUniqueTimestamp(); + api::Timestamp putTimestamp = _op_ctx.generate_unique_timestamp(); if (reply.getDocument().get()) { api::Timestamp receivedTimestamp = reply.getLastModifiedTimestamp(); @@ -534,7 +538,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sen } bool TwoPhaseUpdateOperation::may_restart_with_fast_path(const api::GetReply& reply) { - return (_manager.getDistributor().getConfig().update_fast_path_restart_enabled() && + return (_op_ctx.distributor_config().update_fast_path_restart_enabled() && !_replicas_at_get_send_time.empty() && // To ensure we send CreateBucket+Put if no replicas exist. reply.had_consistent_replicas() && replica_set_unchanged_after_get_operation()); diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 5a936405055..330609b276b 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -55,7 +55,9 @@ class GetOperation; class TwoPhaseUpdateOperation : public SequencedOperation { public: - TwoPhaseUpdateOperation(DistributorComponent& manager, + TwoPhaseUpdateOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, + DocumentSelectionParser& parser, DistributorBucketSpace &bucketSpace, std::shared_ptr<api::UpdateCommand> msg, DistributorMetricSet& metrics, @@ -141,8 +143,8 @@ private: PersistenceOperationMetricSet& _metadata_get_metrics; std::shared_ptr<api::UpdateCommand> _updateCmd; std::shared_ptr<api::StorageReply> _updateReply; - DistributorComponent& _manager; DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; DocumentSelectionParser& _parser; DistributorBucketSpace &_bucketSpace; SentMessageMap _sentMessageMap; diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 258b95c4952..1fd2ea90258 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -17,19 +17,19 @@ using document::BucketSpace; namespace storage::distributor { -UpdateOperation::UpdateOperation(DistributorComponent& manager, +UpdateOperation::UpdateOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, UpdateMetricSet& metric) : Operation(), _trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg), - manager, msg->getTimestamp()), + node_ctx, op_ctx, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), _new_timestamp(_msg->getTimestamp()), _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), - _manager(manager), - _node_ctx(manager), + _node_ctx(node_ctx), _bucketSpace(bucketSpace), _newestTimestampLocation(), _infoAtSendTime(), diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 6fb6e71a68e..3db9979aa13 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -23,7 +23,8 @@ class DistributorBucketSpace; class UpdateOperation : public Operation { public: - UpdateOperation(DistributorComponent& manager, + UpdateOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::UpdateCommand> & msg, UpdateMetricSet& metric); @@ -45,7 +46,6 @@ private: const api::Timestamp _new_timestamp; const bool _is_auto_create_update; - DistributorComponent& _manager; DistributorNodeContext& _node_ctx; DistributorBucketSpace &_bucketSpace; std::pair<document::BucketId, uint16_t> _newestTimestampLocation; diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index 9d0ca12ac5c..c35a6671c8d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -44,14 +44,15 @@ VisitorOperation::BucketInfo::toString() const VisitorOperation::SuperBucketInfo::~SuperBucketInfo() = default; VisitorOperation::VisitorOperation( - DistributorComponent& owner, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, const api::CreateVisitorCommand::SP& m, const Config& config, VisitorMetricSet& metrics) : Operation(), - _owner(owner), - _node_ctx(owner), + _node_ctx(node_ctx), + _op_ctx(op_ctx), _bucketSpace(bucketSpace), _msg(m), _sentReply(false), @@ -73,7 +74,7 @@ VisitorOperation::VisitorOperation( _fromTime = m->getFromTime(); _toTime = m->getToTime(); if (_toTime == 0) { - _toTime = owner.getUniqueTimestamp(); + _toTime = _op_ctx.generate_unique_timestamp(); } } @@ -276,7 +277,7 @@ void VisitorOperation::verifyDistributorOwnsBucket(const document::BucketId& bid) { document::Bucket bucket(_msg->getBucketSpace(), bid); - BucketOwnership bo(_owner.checkOwnershipInPendingAndCurrentState(bucket)); + BucketOwnership bo(_op_ctx.check_ownership_in_pending_and_current_state(bucket)); if (!bo.isOwned()) { verifyDistributorIsNotDown(bo.getNonOwnedState()); std::string systemStateStr = bo.getNonOwnedState().toString(); diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h index 993c0c0808d..28be7dfe353 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.h @@ -16,9 +16,9 @@ namespace storage::lib { class ClusterState; } namespace storage::distributor { -class DistributorComponent; class DistributorBucketSpace; class DistributorNodeContext; +class DistributorOperationContext; class VisitorOperation : public Operation { @@ -33,7 +33,8 @@ public: uint32_t maxVisitorsPerNodePerVisitor; }; - VisitorOperation(DistributorComponent& manager, + VisitorOperation(DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, DistributorBucketSpace &bucketSpace, const std::shared_ptr<api::CreateVisitorCommand> & msg, const Config& config, @@ -140,8 +141,8 @@ private: */ vespalib::duration timeLeft() const noexcept; - DistributorComponent& _owner; DistributorNodeContext& _node_ctx; + DistributorOperationContext& _op_ctx; DistributorBucketSpace &_bucketSpace; SentMessagesMap _sentMessages; diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index 07342dc3e7f..9e064087c08 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -15,15 +15,16 @@ namespace storage::distributor { PersistenceMessageTrackerImpl::PersistenceMessageTrackerImpl( PersistenceOperationMetricSet& metric, std::shared_ptr<api::BucketInfoReply> reply, - DistributorComponent& link, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, api::Timestamp revertTimestamp) - : MessageTracker(link.cluster_name()), + : MessageTracker(node_ctx.cluster_name()), _metric(metric), _reply(std::move(reply)), - _manager(link), + _op_ctx(op_ctx), _revertTimestamp(revertTimestamp), _trace(_reply->getTrace().getLevel()), - _requestTimer(link.clock()), + _requestTimer(node_ctx.clock()), _n_persistence_replies_total(0), _n_successful_persistence_replies(0), _priority(_reply->getPriority()), @@ -37,11 +38,11 @@ void PersistenceMessageTrackerImpl::updateDB() { for (const auto & entry : _bucketInfo) { - _manager.updateBucketDatabase(entry.first, entry.second); + _op_ctx.update_bucket_database(entry.first, entry.second); } for (const auto & entry : _remapBucketInfo){ - _manager.updateBucketDatabase(entry.first, entry.second,DatabaseUpdate::CREATE_IF_NONEXISTING); + _op_ctx.update_bucket_database(entry.first, entry.second, DatabaseUpdate::CREATE_IF_NONEXISTING); } } @@ -119,7 +120,7 @@ PersistenceMessageTrackerImpl::canSendReplyEarly() const LOG(spam, "Can't return early because we have already replied or failed"); return false; } - auto &bucketSpaceRepo(_manager.getBucketSpaceRepo()); + auto &bucketSpaceRepo(_op_ctx.bucket_space_repo()); auto &bucketSpace(bucketSpaceRepo.get(_reply->getBucket().getBucketSpace())); const lib::Distribution& distribution = bucketSpace.getDistribution(); @@ -164,13 +165,13 @@ PersistenceMessageTrackerImpl::addBucketInfoFromReply( bucket.toString().c_str(), bucketInfo.toString().c_str(), node); - _remapBucketInfo[bucket].emplace_back(_manager.getUniqueTimestamp(), node, bucketInfo); + _remapBucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo); } else { LOG(debug, "Bucket %s: Received bucket info %s from node %d", bucket.toString().c_str(), bucketInfo.toString().c_str(), node); - _bucketInfo[bucket].emplace_back(_manager.getUniqueTimestamp(), node, bucketInfo); + _bucketInfo[bucket].emplace_back(_op_ctx.generate_unique_timestamp(), node, bucketInfo); } } @@ -195,7 +196,7 @@ PersistenceMessageTrackerImpl::logSuccessfulReply(uint16_t node, const api::Buck bool PersistenceMessageTrackerImpl::shouldRevert() const { - return _manager.getDistributorConfig().enableRevert + return _op_ctx.distributor_config().enable_revert() && !_revertNodes.empty() && !_success && _reply; } @@ -258,7 +259,7 @@ PersistenceMessageTrackerImpl::handleCreateBucketReply( && reply.getResult().getResult() != api::ReturnCode::EXISTS) { LOG(spam, "Create bucket reply failed, so deleting it from bucket db"); - _manager.removeNodeFromDB(reply.getBucket(), node); + _op_ctx.remove_node_from_bucket_database(reply.getBucket(), node); } } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index cf9f4017eda..1421b4c2038 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -36,7 +36,8 @@ private: public: PersistenceMessageTrackerImpl(PersistenceOperationMetricSet& metric, std::shared_ptr<api::BucketInfoReply> reply, - DistributorComponent&, + DistributorNodeContext& node_ctx, + DistributorOperationContext& op_ctx, api::Timestamp revertTimestamp = 0); ~PersistenceMessageTrackerImpl() override; @@ -70,7 +71,7 @@ private: PersistenceOperationMetricSet& _metric; std::shared_ptr<api::BucketInfoReply> _reply; - DistributorComponent& _manager; + DistributorOperationContext& _op_ctx; api::Timestamp _revertTimestamp; std::vector<BucketNodePair> _revertNodes; mbus::Trace _trace; |