diff options
Diffstat (limited to 'storage')
5 files changed, 35 insertions, 25 deletions
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 00e1e832b3c..d3a8b270ad0 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -67,7 +67,7 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m auto& comp = distributor_component(); return std::make_shared<UpdateOperation>( - comp, comp, getDistributorBucketSpace(), msg, + comp, comp, getDistributorBucketSpace(), msg, std::vector<BucketDatabase::Entry>(), getDistributor().getMetrics().updates); } diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 2d6e3bd30e2..454aa808bad 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -159,13 +159,18 @@ TwoPhaseUpdateOperation::sendReplyWithResult( sendReply(sender, _updateReply); } -bool -TwoPhaseUpdateOperation::isFastPathPossible() const +std::vector<BucketDatabase::Entry> +TwoPhaseUpdateOperation::get_bucket_database_entries() const { - // Fast path iff bucket exists AND is consistent (split and copies). std::vector<BucketDatabase::Entry> entries; _bucketSpace.getBucketDatabase().getParents(_updateDocBucketId, entries); + return entries; +} +bool +TwoPhaseUpdateOperation::isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const +{ + // Fast path iff bucket exists AND is consistent (split and copies). if (entries.size() != 1) { return false; } @@ -173,11 +178,12 @@ TwoPhaseUpdateOperation::isFastPathPossible() const } void -TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) +TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender, std::vector<BucketDatabase::Entry> entries) { _mode = Mode::FAST_PATH; LOG(debug, "Update(%s) fast path: sending Update commands", update_doc_id().c_str()); - auto updateOperation = std::make_shared<UpdateOperation>(_node_ctx, _op_ctx, _bucketSpace, _updateCmd, _updateMetric); + auto updateOperation = std::make_shared<UpdateOperation> + (_node_ctx, _op_ctx, _bucketSpace, _updateCmd, std::move(entries), _updateMetric); UpdateOperation & op = *updateOperation; IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); op.start(intermediate, _node_ctx.clock().getTimeInMillis()); @@ -238,8 +244,9 @@ TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() { void TwoPhaseUpdateOperation::onStart(DistributorMessageSender& sender) { - if (isFastPathPossible()) { - startFastPathUpdate(sender); + auto entries = get_bucket_database_entries(); + if (isFastPathPossible(entries)) { + startFastPathUpdate(sender, std::move(entries)); } else { startSafePathUpdate(sender); } @@ -575,7 +582,7 @@ void TwoPhaseUpdateOperation::restart_with_fast_path_due_to_consistent_get_times // Must not be any other messages in flight, or we might mis-interpret them when we // have switched back to fast-path mode. assert(_sentMessageMap.empty()); - startFastPathUpdate(sender); + startFastPathUpdate(sender, {}); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 330609b276b..af45932b530 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -98,8 +98,9 @@ private: void sendReplyWithResult(DistributorMessageSender&, const api::ReturnCode&); void ensureUpdateReplyCreated(); - bool isFastPathPossible() const; - void startFastPathUpdate(DistributorMessageSender&); + std::vector<BucketDatabase::Entry> get_bucket_database_entries() const; + bool isFastPathPossible(const std::vector<BucketDatabase::Entry>& entries) const; + void startFastPathUpdate(DistributorMessageSender& sender, std::vector<BucketDatabase::Entry> entries); void startSafePathUpdate(DistributorMessageSender&); bool lostBucketOwnershipBetweenPhases() const; void sendLostOwnershipTransientErrorReply(DistributorMessageSender&); diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 1fd2ea90258..f0cd53c15f5 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -19,14 +19,16 @@ namespace storage::distributor { UpdateOperation::UpdateOperation(DistributorNodeContext& node_ctx, DistributorOperationContext& op_ctx, - DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::UpdateCommand> & msg, + DistributorBucketSpace& bucketSpace, + const std::shared_ptr<api::UpdateCommand>& msg, + std::vector<BucketDatabase::Entry> entries, UpdateMetricSet& metric) : Operation(), _trackerInstance(metric, std::make_shared<api::UpdateReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), _tracker(_trackerInstance), _msg(msg), + _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), _node_ctx(node_ctx), @@ -73,14 +75,12 @@ UpdateOperation::onStart(DistributorMessageSender& sender) return; } - document::BucketId bucketId( - _node_ctx.bucket_id_factory().getBucketId( - _msg->getDocumentId())); - - std::vector<BucketDatabase::Entry> entries; - _bucketSpace.getBucketDatabase().getParents(bucketId, entries); + if (_entries.empty()) { + document::BucketId bucketId(_node_ctx.bucket_id_factory().getBucketId(_msg->getDocumentId())); + _bucketSpace.getBucketDatabase().getParents(bucketId, _entries); + } - if (entries.empty()) { + if (_entries.empty()) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::OK, "No buckets found for given document update")); @@ -89,14 +89,14 @@ UpdateOperation::onStart(DistributorMessageSender& sender) // An UpdateOperation should only be started iff all replicas are consistent // with each other, so sampling a single replica should be equal to sampling them all. - assert(entries[0].getBucketInfo().getNodeCount() > 0); // Empty buckets are not allowed - _infoAtSendTime = entries[0].getBucketInfo().getNodeRef(0).getBucketInfo(); + assert(_entries[0].getBucketInfo().getNodeCount() > 0); // Empty buckets are not allowed + _infoAtSendTime = _entries[0].getBucketInfo().getNodeRef(0).getBucketInfo(); // FIXME(vekterli): this loop will happily update all replicas in the // bucket sub-tree, but there is nothing here at all which will fail the // update if we cannot satisfy a desired replication level (not even for // n-of-m operations). - for (const auto& entry : entries) { + for (const auto& entry : _entries) { LOG(spam, "Found bucket %s", entry.toString().c_str()); const std::vector<uint16_t>& nodes = entry->getNodes(); diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h index 3db9979aa13..9f0796e71d2 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.h @@ -25,8 +25,9 @@ class UpdateOperation : public Operation public: UpdateOperation(DistributorNodeContext& node_ctx, DistributorOperationContext& op_ctx, - DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::UpdateCommand> & msg, + DistributorBucketSpace& bucketSpace, + const std::shared_ptr<api::UpdateCommand>& msg, + std::vector<BucketDatabase::Entry> entries, UpdateMetricSet& metric); void onStart(DistributorMessageSender& sender) override; @@ -43,6 +44,7 @@ private: PersistenceMessageTrackerImpl _trackerInstance; PersistenceMessageTracker& _tracker; std::shared_ptr<api::UpdateCommand> _msg; + std::vector<BucketDatabase::Entry> _entries; const api::Timestamp _new_timestamp; const bool _is_auto_create_update; |