diff options
Diffstat (limited to 'searchcore')
4 files changed, 82 insertions, 63 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index cef73d2975c..28f5e3c7111 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -107,7 +107,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & _clusterStateChangedNotifier.addClusterStateChangedHandler(this); _bucketStateChangedNotifier.addBucketStateChangedHandler(this); _diskMemUsageNotifier.addDiskMemUsageListener(this); - recompute(); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); } BucketMoveJobV2::~BucketMoveJobV2() @@ -205,9 +205,6 @@ BucketMoveJobV2::failOperation(BucketId bucketId) { _master.execute(makeLambdaTask([this, bucketId]() { if (_stopped.load(std::memory_order_relaxed)) return; cancelBucket(bucketId); - if (_bucketsInFlight.contains(bucketId)) { - handleMoveResult(_bucketsInFlight[bucketId]); - } considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); })); } @@ -240,37 +237,42 @@ BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDe } void -BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) { - mover->moveDocuments(std::move(ops), std::move(onDone)); - handleMoveResult(std::move(mover)); +BucketMoveJobV2::completeMove(BucketMoverSP mover, GuardedMoveOps ops, IDestructorCallbackSP onDone) { + mover->moveDocuments(std::move(ops.success), std::move(onDone)); + ops.failed.clear(); + if (checkIfMoverComplete(*mover)) { + checkForReschedule(_ready.meta_store()->getBucketDB().takeGuard(), mover->getBucket()); + } } -void -BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { - bool bucketMoveComplete = mover->allScheduled() && mover->inSync(); - if (bucketMoveComplete || mover->needReschedule()) { - BucketId bucket = mover->getBucket(); - assert(mover->needReschedule() || _bucketsInFlight.contains(bucket)); +bool +BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { + bool bucketMoveComplete = mover.allScheduled() && mover.inSync(); + bool needReschedule = mover.needReschedule(); + if (bucketMoveComplete || needReschedule) { + BucketId bucket = mover.getBucket(); + assert(mover.needReschedule() || _bucketsInFlight.contains(bucket)); _bucketsInFlight.erase(bucket); - if (mover->needReschedule()) { - reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); - if (!_buckets2Move.contains(bucket)) { - // It failed, but all was moved anyway - _modifiedHandler.notifyBucketModified(bucket); - } + if (needReschedule) { + _postponedUntilSafe.insert(bucket); } else { _modifiedHandler.notifyBucketModified(bucket); } - if (_postponedUntilSafe.contains(bucket)) { - _postponedUntilSafe.erase(bucket); - reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); - } - if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) { - _postponedUntilSafe.erase(RECOMPUTE_TOKEN); - recompute(); - } - updatePending(); + return true; } + return _bucketsInFlight.empty(); +} +void +BucketMoveJobV2::checkForReschedule(const bucketdb::Guard & guard, BucketId bucket) { + if (_postponedUntilSafe.contains(bucket)) { + _postponedUntilSafe.erase(bucket); + reconsiderBucket(guard, bucket); + } + if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_TOKEN)) { + _postponedUntilSafe.erase(RECOMPUTE_TOKEN); + recompute(guard); + } + updatePending(); } void @@ -281,7 +283,7 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) { _movers.erase(std::remove_if(_movers.begin(), _movers.end(), [bucket](const BucketMoverSP &mover) { return mover->getBucket() == bucket; }), _movers.end()); - handleMoveResult(inFlight->second); + checkIfMoverComplete(*inFlight->second); } } @@ -315,10 +317,10 @@ BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketI } BucketMoveJobV2::BucketMoveSet -BucketMoveJobV2::computeBuckets2Move() +BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) { BucketMoveJobV2::BucketMoveSet toMove; - for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) { + for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) { auto [mustMove, wantReady] = needMove(itr); if (mustMove) { toMove[itr.getBucket()] = wantReady; @@ -398,7 +400,11 @@ BucketMoveJobV2::run() void BucketMoveJobV2::recompute() { - _buckets2Move = computeBuckets2Move(); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); +} +void +BucketMoveJobV2::recompute(const bucketdb::Guard & guard) { + _buckets2Move = computeBuckets2Move(guard); updatePending(); } @@ -427,7 +433,7 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();}); std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); }); if (_bucketsInFlight.empty()) { - recompute(); + recompute(_ready.meta_store()->getBucketDB().takeGuard()); } else { _postponedUntilSafe.insert(RECOMPUTE_TOKEN); } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index 0f1c567b1b3..d7f44ee37b2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -55,7 +55,7 @@ private: using Bucket2Mover = std::map<BucketId, BucketMoverSP>; using Movers = std::vector<BucketMoverSP>; using MoveKey = BucketMover::MoveKey; - using GuardedMoveOp = BucketMover::GuardedMoveOp; + using GuardedMoveOps = BucketMover::GuardedMoveOps; using BucketSet = vespalib::hash_set<BucketId, BucketId::hash>; std::shared_ptr<IBucketStateCalculator> _calc; IDocumentMoveHandler &_moveHandler; @@ -83,19 +83,21 @@ private: void startMove(BucketMoverSP mover, size_t maxDocsToMove); void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context); - void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context); - void handleMoveResult(BucketMoverSP mover); + void completeMove(BucketMoverSP mover, GuardedMoveOps moveOps, IDestructorCallbackSP context); + bool checkIfMoverComplete(const BucketMover & mover); + void checkForReschedule(const bucketdb::Guard & guard, BucketId bucket); void considerBucket(const bucketdb::Guard & guard, BucketId bucket); void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket); void updatePending(); void cancelBucket(BucketId bucket); // True if something to cancel NeedResult needMove(const ScanIterator &itr) const; - BucketMoveSet computeBuckets2Move(); + BucketMoveSet computeBuckets2Move(const bucketdb::Guard & guard); BucketMoverSP createMover(BucketId bucket, bool wantReady); BucketMoverSP greedyCreateMover(); void backFillMovers(); void moveDocs(size_t maxDocsToMove); void failOperation(BucketId bucket); + void recompute(const bucketdb::Guard & guard); friend class StartMove; public: BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, @@ -117,7 +119,7 @@ public: bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); bool done() const; - void recompute(); + void recompute(); // Only for testing bool inSync() const; bool run() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index e33513df3f1..f08cd4d7ab7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -20,8 +20,8 @@ namespace proton::bucketdb { typedef IDocumentMetaStore::Iterator Iterator; -BucketMover::GuardedMoveOp -BucketMover::createMoveOperation(MoveKey &key) { +MoveOperation::UP +BucketMover::createMoveOperation(const MoveKey &key) { if (_source->lidNeedsCommit(key._lid)) return {}; const RawDocumentMetaData &metaNow = _source->meta_store()->getRawMetaData(key._lid); @@ -29,13 +29,13 @@ BucketMover::createMoveOperation(MoveKey &key) { if (metaNow.getTimestamp() != key._timestamp) return {}; Document::SP doc(_source->retriever()->getFullDocument(key._lid)); - if (!doc || doc->getId().getGlobalId() != key._gid) - return {}; // Failed to retrieve document, removed or changed identity + if (!doc || doc->getId().getGlobalId() != key._gid) { + // Failed to retrieve document, removed or changed identity + return {}; + } BucketId bucketId = _bucket.stripUnused(); - return BucketMover::GuardedMoveOp(std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc), - DbDocumentId(_source->sub_db_id(), key._lid), - _targetSubDbId), - std::move(key._guard)); + return std::make_unique<MoveOperation>(bucketId, key._timestamp, std::move(doc), + DbDocumentId(_source->sub_db_id(), key._lid), _targetSubDbId); } void @@ -89,19 +89,26 @@ BucketMover::getKeysToMove(size_t maxDocsToMove) { return result; } -std::vector<BucketMover::GuardedMoveOp> +BucketMover::GuardedMoveOps BucketMover::createMoveOperations(std::vector<MoveKey> toMove) { - std::vector<GuardedMoveOp> successfulReads; - successfulReads.reserve(toMove.size()); + GuardedMoveOps moveOps; + moveOps.success.reserve(toMove.size()); for (MoveKey &key : toMove) { - auto moveOp = createMoveOperation(key); - if (!moveOp.first) { - _needReschedule.store(true, std::memory_order_relaxed); - break; + if (moveOps.failed.empty()) { + auto moveOp = createMoveOperation(key); + if (moveOp) { + moveOps.success.emplace_back(std::move(moveOp), std::move(key._guard)); + } else { + moveOps.failed.push_back(std::move(key._guard)); + } + } else { + moveOps.failed.push_back(std::move(key._guard)); } - successfulReads.push_back(std::move(moveOp)); } - return successfulReads; + if ( ! moveOps.failed.empty()) { + _needReschedule.store(true, std::memory_order_relaxed); + } + return moveOps; } void @@ -149,17 +156,16 @@ DocumentBucketMover::moveDocuments(size_t maxDocsToMove, IMoveOperationLimiter & return true; } auto [keys, done] = _impl->getKeysToMove(maxDocsToMove); - size_t numKeys = keys.size(); auto moveOps = _impl->createMoveOperations(std::move(keys)); - bool allOk = (numKeys == moveOps.size()); + bool allOk = moveOps.failed.empty(); if (done && allOk) { _impl->setAllScheduled(); } - if (moveOps.empty()) return allOk; + if (moveOps.success.empty()) return allOk; - _impl->updateLastValidGid(moveOps.back().first->getDocument()->getId().getGlobalId()); + _impl->updateLastValidGid(moveOps.success.back().first->getDocument()->getId().getGlobalId()); - for (auto & moveOp : moveOps) { + for (auto & moveOp : moveOps.success) { // We cache the bucket for the document we are going to move to avoid getting // inconsistent bucket info (getBucketInfo()) while moving between ready and not-ready // sub dbs as the bucket info is not updated atomically in this case. diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h index 05ceb4c17e6..fc7760a4dc4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.h @@ -64,6 +64,12 @@ public: MoveGuard _guard; }; + using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>; + struct GuardedMoveOps { + std::vector<GuardedMoveOp> success; + std::vector<MoveGuard> failed; + }; + BucketMover(const document::BucketId &bucket, const MaintenanceDocumentSubDB *source, uint32_t targetSubDbId, IDocumentMoveHandler &handler) noexcept; BucketMover(BucketMover &&) noexcept = delete; @@ -72,11 +78,10 @@ public: BucketMover & operator=(const BucketMover &) = delete; ~BucketMover(); - using GuardedMoveOp = std::pair<MoveOperationUP, MoveGuard>; /// Must be called in master thread std::pair<std::vector<MoveKey>, bool> getKeysToMove(size_t maxDocsToMove); /// Call from any thread - std::vector<GuardedMoveOp> createMoveOperations(std::vector<MoveKey> toMove); + GuardedMoveOps createMoveOperations(std::vector<MoveKey> toMove); /// Must be called in master thread void moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone); void moveDocument(MoveOperationUP moveOp, IDestructorCallbackSP onDone); @@ -108,7 +113,7 @@ private: bool _allScheduled; // All moves started, or operation has been cancelled bool _lastGidValid; document::GlobalId _lastGid; - GuardedMoveOp createMoveOperation(MoveKey & key); + MoveOperationUP createMoveOperation(const MoveKey & key); size_t pending() const { return _started.load(std::memory_order_relaxed) - _completed.load(std::memory_order_relaxed); } |