diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-24 12:19:43 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-30 08:46:13 +0000 |
commit | eb6c18afb1fe2637545ac4fec163f7d043682108 (patch) | |
tree | 33daad971e841dd8d987249e5768658ae4872f54 | |
parent | 333b44e0d7a2f1e4c251fe1c95103ad287cfbf5f (diff) |
BucketMove is 3 phased, startMove, createMoveOps, completeMove.
First and last happens in master, while second happens in bucket executor.
As feed might happen inbetween these phases, care must be taken to check that the state
for a document is the same in complete as it was in first phase.
If not the move must be retried.
13 files changed, 38 insertions, 11 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.cpp index 0359c13a5f7..ff44d898997 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.cpp @@ -28,7 +28,7 @@ MyMoveHandler::MyMoveHandler(bucketdb::BucketDBOwner &bucketDb, bool storeMoveDo MyMoveHandler::~MyMoveHandler() = default; -void +IDocumentMoveHandler::MoveResult MyMoveHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { _moves.push_back(op); if (_bucketDb.takeGuard()->isCachedBucket(op.getBucketId())) { @@ -37,6 +37,7 @@ MyMoveHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx if (_storeMoveDoneContexts) { _moveDoneContexts.push_back(std::move(moveDoneCtx)); } + return MoveResult::SUCCESS; } MySubDb::MySubDb(const std::shared_ptr<const DocumentTypeRepo> &repo, std::shared_ptr<bucketdb::BucketDBOwner> bucketDB, diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h index 9d5c4e0e4ec..ded00589f60 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/bucketmover_common.h @@ -43,7 +43,7 @@ struct MyMoveHandler : public IDocumentMoveHandler { MyMoveHandler(bucketdb::BucketDBOwner &bucketDb, bool storeMoveDoneContext = false); ~MyMoveHandler() override; - void handleMove(MoveOperation &op, vespalib::IDestructorCallback::SP moveDoneCtx) override; + MoveResult handleMove(MoveOperation &op, vespalib::IDestructorCallback::SP moveDoneCtx) override; void reset() { _moves.clear(); diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 2a074a7404c..509210679da 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -40,7 +40,6 @@ #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/monitored_refcount.h> -#include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <unistd.h> #include <thread> @@ -216,7 +215,8 @@ public: ~MyFeedHandler() override; bool isExecutorThread() const; - void handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) override; + + MoveResult handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) override; void heartBeat() override; @@ -624,7 +624,7 @@ MyFeedHandler::isExecutorThread() const } -void +IDocumentMoveHandler::MoveResult MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { assert(isExecutorThread()); @@ -640,6 +640,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx appendOperation(op, std::move(moveDoneCtx)); _subDBs[op.getSubDbId()]->handleMove(op); _subDBs[op.getPrevSubDbId()]->handleMove(op); + return MoveResult::SUCCESS; } diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp index eadeb6b1e75..612b31b0123 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.cpp @@ -193,6 +193,13 @@ CombiningFeedView::handleDeleteBucket(const DeleteBucketOperation &delOp, DoneCa } } +bool +CombiningFeedView::isMoveStillValid(const MoveOperation & moveOp) const { + uint32_t subDbId = moveOp.getPrevSubDbId(); + assert(subDbId < _views.size()); + return _views[subDbId]->isMoveStillValid(moveOp); +} + void CombiningFeedView::prepareMove(MoveOperation &moveOp) { diff --git a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h index 0b6f6039e36..2890aa7ac0f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/combiningfeedview.h @@ -72,6 +72,7 @@ public: void prepareRemove(RemoveOperation &rmOp) override; void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; void prepareDeleteBucket(DeleteBucketOperation &delOp) override; + bool isMoveStillValid(const MoveOperation & moveOp) const override; void prepareMove(MoveOperation &putOp) override; void handleDeleteBucket(const DeleteBucketOperation &delOp, DoneCallback onDone) override; void handleMove(const MoveOperation &moveOp, DoneCallback onDone) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index 4d410282b42..8506fcfefdf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -38,7 +38,10 @@ BucketMover::createMoveOperation(const MoveKey &key) { void BucketMover::moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone) { - _handler->handleMove(*moveOp, std::move(onDone)); + auto result = _handler->handleMove(*moveOp, std::move(onDone)); + if (result == IDocumentMoveHandler::MoveResult::FAILURE) { + _needReschedule.store(true, std::memory_order_relaxed); + } } BucketMover::MoveKey::MoveKey(uint32_t lid, const document::GlobalId &gid, Timestamp timestamp, MoveGuard guard) noexcept diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index f5774734949..3c957fc3cf2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -756,10 +756,12 @@ FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op) })); } -void +IDocumentMoveHandler::MoveResult FeedHandler::handleMove(MoveOperation &op, vespalib::IDestructorCallback::SP moveDoneCtx) { assert(_writeService.master().isCurrentThread()); + if ( ! _activeFeedView->isMoveStillValid(op)) return MoveResult::FAILURE; + op.set_prepare_serial_num(inc_prepare_serial_num()); _activeFeedView->prepareMove(op); assert(op.getValidDbdId()); @@ -767,6 +769,7 @@ FeedHandler::handleMove(MoveOperation &op, vespalib::IDestructorCallback::SP mov assert(op.getSubDbId() != op.getPrevSubDbId()); appendOperation(op, moveDoneCtx); _activeFeedView->handleMove(op, std::move(moveDoneCtx)); + return MoveResult::SUCCESS; } void diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index dddc032ba04..06fca28a55c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -239,7 +239,7 @@ public: void performOperation(FeedToken token, FeedOperationUP op); void handleOperation(FeedToken token, FeedOperationUP op); - void handleMove(MoveOperation &op, std::shared_ptr<vespalib::IDestructorCallback> moveDoneCtx) override; + MoveResult handleMove(MoveOperation &op, std::shared_ptr<vespalib::IDestructorCallback> moveDoneCtx) override; void heartBeat() override; RPC::Result receive(const Packet &packet) override; diff --git a/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h b/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h index f5ad1cd596f..98bcc7c1265 100644 --- a/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/idocumentmovehandler.h @@ -15,7 +15,8 @@ class MoveOperation; */ struct IDocumentMoveHandler { - virtual void handleMove(MoveOperation &op, std::shared_ptr<vespalib::IDestructorCallback> moveDoneCtx) = 0; + enum class MoveResult { SUCCESS, FAILURE}; + [[nodiscard]] virtual MoveResult handleMove(MoveOperation &op, std::shared_ptr<vespalib::IDestructorCallback> moveDoneCtx) = 0; virtual ~IDocumentMoveHandler() = default; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h index 946d3a9f2f1..d3a43b535da 100644 --- a/searchcore/src/vespa/searchcore/proton/server/ifeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/ifeedview.h @@ -56,6 +56,7 @@ public: virtual void prepareRemove(RemoveOperation &rmOp) = 0; virtual void handleRemove(FeedToken token, const RemoveOperation &rmOp) = 0; virtual void prepareDeleteBucket(DeleteBucketOperation &delOp) = 0; + [[nodiscard]] virtual bool isMoveStillValid(const MoveOperation & moveOp) const = 0; virtual void prepareMove(MoveOperation &putOp) = 0; virtual void handleDeleteBucket(const DeleteBucketOperation &delOp, DoneCallback onDone) = 0; virtual void handleMove(const MoveOperation &putOp, DoneCallback onDone) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 4c43c11691c..a426fa7d682 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -2,13 +2,11 @@ #include "storeonlyfeedview.h" #include "forcecommitcontext.h" -#include "ireplayconfig.h" #include "operationdonecontext.h" #include "putdonecontext.h" #include "removedonecontext.h" #include "updatedonecontext.h" #include <vespa/searchcore/proton/attribute/ifieldupdatecallback.h> -#include <vespa/searchcore/proton/common/feedtoken.h> #include <vespa/searchcore/proton/feedoperation/operations.h> #include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h> #include <vespa/searchcore/proton/reference/i_pending_gid_to_lid_changes.h> @@ -712,6 +710,15 @@ StoreOnlyFeedView::internalDeleteBucket(const DeleteBucketOperation &delOp, Done _params._docTypeName.toString().c_str(), delOp.getBucketId().toString().c_str(), rm_count); } +bool +StoreOnlyFeedView::isMoveStillValid(const MoveOperation & moveOp) const { + uint32_t lid = moveOp.getPrevLid(); + if ( ! _metaStore.validLid(lid)) return false; + + const RawDocumentMetaData & meta = _metaStore.getRawMetaData(lid); + return meta.getTimestamp() == moveOp.getTimestamp(); +} + // CombiningFeedView calls this only for the subdb we're moving to. void StoreOnlyFeedView::prepareMove(MoveOperation &moveOp) diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h index 5e9ebf4d3a6..0e1a4e09482 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h @@ -215,6 +215,7 @@ public: void handleRemove(FeedToken token, const RemoveOperation &rmOp) override; void prepareDeleteBucket(DeleteBucketOperation &delOp) override; void handleDeleteBucket(const DeleteBucketOperation &delOp, DoneCallback onDone) override; + bool isMoveStillValid(const MoveOperation & moveOp) const override; void prepareMove(MoveOperation &putOp) override; void handleMove(const MoveOperation &putOp, DoneCallback doneCtx) override; void heartBeat(search::SerialNum serialNum, DoneCallback onDone) override; diff --git a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h index f8713b3b691..751d4464ca7 100644 --- a/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h +++ b/searchcore/src/vespa/searchcore/proton/test/dummy_feed_view.h @@ -26,6 +26,7 @@ struct DummyFeedView : public IFeedView void handleRemove(FeedToken, const RemoveOperation &) override {} void prepareDeleteBucket(DeleteBucketOperation &) override {} void handleDeleteBucket(const DeleteBucketOperation &, DoneCallback) override {} + bool isMoveStillValid(const MoveOperation &) const override { return true; } void prepareMove(MoveOperation &) override {} void handleMove(const MoveOperation &, DoneCallback) override {} void heartBeat(search::SerialNum, DoneCallback) override {} |