diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-04-21 12:50:00 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-21 12:50:00 +0200 |
commit | 696d1fd36cf2c3729c134446865b432c69aaad3e (patch) | |
tree | 068a81c70be4c352a5e82a49045cb61a55546d42 /storage | |
parent | a24aff8b8ba062521d52dfd66a9e48bb9eebeed7 (diff) | |
parent | a43999fa18acc9ae7aa1ed65d0d839670087b5d1 (diff) |
Merge pull request #26800 from vespa-engine/vekterli/minor-put-remove-cleanups
Minor cleanups in Put/Remove operation code
Diffstat (limited to 'storage')
3 files changed, 34 insertions, 30 deletions
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index dea215f47dc..aa5ad217ae9 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -15,13 +15,12 @@ #include <algorithm> #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.doc.put"); +LOG_SETUP(".distributor.operations.external.put"); - -using namespace storage::distributor; -using namespace storage; using document::BucketSpace; +namespace storage::distributor { + PutOperation::PutOperation(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, DistributorBucketSpace& bucketSpace, @@ -116,6 +115,20 @@ bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTarge }); } +bool PutOperation::at_least_one_storage_node_is_available() const { + const lib::ClusterState& cluster_state = _bucketSpace.getClusterState(); + + const uint16_t storage_node_index_ubound = cluster_state.getNodeCount(lib::NodeType::STORAGE); + for (uint16_t i = 0; i < storage_node_index_ubound; i++) { + if (cluster_state.getNodeState(lib::Node(lib::NodeType::STORAGE, i)) + .getState().oneOf(storage_node_up_states())) + { + return true; + } + } + return false; +} + void PutOperation::onStart(DistributorStripeMessageSender& sender) { @@ -124,19 +137,7 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) LOG(debug, "Received PUT %s for bucket %s", _msg->getDocumentId().toString().c_str(), bid.toString().c_str()); - lib::ClusterState systemState = _bucketSpace.getClusterState(); - - // Don't do anything if all nodes are down. - bool up = false; - for (uint16_t i = 0; i < systemState.getNodeCount(lib::NodeType::STORAGE); i++) { - if (systemState.getNodeState(lib::Node(lib::NodeType::STORAGE, i)) - .getState().oneOf(storage_node_up_states())) - { - up = true; - } - } - - if (up) { + if (at_least_one_storage_node_is_available()) { std::vector<document::BucketId> bucketsToCheckForSplit; OperationTargetResolverImpl targetResolver(_bucketSpace, _bucketSpace.getBucketDatabase(), @@ -145,8 +146,8 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) _msg->getBucket().getBucketSpace()); OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid)); - for (size_t i = 0; i < targets.size(); ++i) { - if (_op_ctx.has_pending_message(targets[i].getNode().getIndex(), targets[i].getBucket(), + for (const auto& target : targets) { + if (_op_ctx.has_pending_message(target.getNode().getIndex(), target.getBucket(), api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, @@ -179,13 +180,12 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) std::vector<PersistenceMessageTracker::ToSend> putBatch; // Now send PUTs - for (uint32_t i = 0; i < targets.size(); i++) { - const OperationTarget& target(targets[i]); + for (const auto& target : targets) { sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(), target.getNode().getIndex(), putBatch); } - if (putBatch.size()) { + if (!putBatch.empty()) { _tracker.queueMessageBatch(putBatch); } else { const char* error = "Can't store document: No storage nodes available"; @@ -196,9 +196,9 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) // Check whether buckets are large enough to be split. // TODO(vekterli): only check entries for sendToExisting? - for (uint32_t i = 0; i < entries.size(); ++i) { + for (const auto& entry : entries) { _op_ctx.send_inline_split_if_bucket_too_large(_msg->getBucket().getBucketSpace(), - entries[i], _msg->getPriority()); + entry, _msg->getPriority()); } _tracker.flushQueue(sender); @@ -235,3 +235,5 @@ PutOperation::onClose(DistributorStripeMessageSender& sender) LOG(debug, "%s", error); _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, error)); } + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 9801fed0c99..283395f1406 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -47,9 +47,10 @@ private: void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); - bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; + [[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; - bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; + [[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; + [[nodiscard]] bool at_least_one_storage_node_is_available() const; std::shared_ptr<api::PutCommand> _msg; DistributorStripeOperationContext& _op_ctx; diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 584ad9de2ce..b752a7fadde 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -5,13 +5,12 @@ #include <vespa/vdslib/state/clusterstate.h> #include <vespa/log/log.h> -LOG_SETUP(".distributor.operation.external.remove"); +LOG_SETUP(".distributor.operations.external.remove"); - -using namespace storage::distributor; -using namespace storage; using document::BucketSpace; +namespace storage::distributor { + RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, DistributorBucketSpace& bucketSpace, @@ -100,3 +99,5 @@ RemoveOperation::onClose(DistributorStripeMessageSender& sender) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } + +} |