diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-01 16:54:16 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-03-01 20:23:01 +0000 |
commit | 61cbfbe0dd4b35461c2662812588573e40e7a3cd (patch) | |
tree | 478f7d90e49f084649687cab6891d949ff0e73eb /searchcore/src | |
parent | 2e743e722ba395dc22755ab22bf2ae2bac2d9522 (diff) |
Postpone buckets for reconsideration until it is safe.
Diffstat (limited to 'searchcore/src')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp | 98 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h | 19 |
2 files changed, 84 insertions, 33 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 0128db681d5..9d036519a6f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -53,6 +53,8 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc) return !(clusterUp && nodeUp && !nodeInitializing); } +constexpr BucketId RECOMPUTE_BUCKETID; + } BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, @@ -84,7 +86,9 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & _bucketSpace(bucketSpace), _iterateCount(0), _movers(), + _bucketsInFlight(), _buckets2Move(), + _postponedUntilSafe(), _stopped(false), _startedCount(0), _executedCount(0), @@ -200,6 +204,10 @@ BucketMoveJobV2::failOperation(BucketId bucketId) { if (_stopped.load(std::memory_order_relaxed)) return; _master.execute(makeLambdaTask([this, bucketId]() { if (_stopped.load(std::memory_order_relaxed)) return; + cancelBucket(bucketId); + if (_bucketsInFlight.contains(bucketId)) { + handleMoveResult(_bucketsInFlight[bucketId]); + } considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); })); } @@ -234,34 +242,58 @@ BucketMoveJobV2::prepareMove(BucketMoverSP mover, std::vector<MoveKey> keys, IDe void BucketMoveJobV2::completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> ops, IDestructorCallbackSP onDone) { mover->moveDocuments(std::move(ops), std::move(onDone)); + handleMoveResult(std::move(mover)); +} + +void +BucketMoveJobV2::handleMoveResult(BucketMoverSP mover) { if (mover->bucketDone() && mover->inSync()) { - _modifiedHandler.notifyBucketModified(mover->getBucket()); + BucketId bucket = mover->getBucket(); + assert(_bucketsInFlight.contains(bucket)); + _modifiedHandler.notifyBucketModified(bucket); + _bucketsInFlight.erase(bucket); + updatePending(); + if (_postponedUntilSafe.contains(bucket)) { + reconsiderBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucket); + } + if (_bucketsInFlight.empty() && _postponedUntilSafe.contains(RECOMPUTE_BUCKETID)) { + recompute(); + } } } void -BucketMoveJobV2::cancelMovesForBucket(BucketId bucket) { - for (auto itr = _movers.begin(); itr != _movers.end(); itr++) { - if (bucket == (*itr)->getBucket()) { - _movers.erase(itr); - backFillMovers(); - return; - } +BucketMoveJobV2::cancelBucket(BucketId bucket) { + auto inFlight = _bucketsInFlight.find(bucket); + if (inFlight != _bucketsInFlight.end()) { + inFlight->second->cancel(); + _movers.erase(std::remove_if(_movers.begin(), _movers.end(), + [bucket](const BucketMoverSP &mover) { return mover->getBucket() == bucket; }), + _movers.end()); + handleMoveResult(std::move(inFlight->second)); } } void -BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) -{ +BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { + cancelBucket(bucket); + if (_bucketsInFlight.contains(bucket)) { + _postponedUntilSafe.insert(bucket); + } else { + reconsiderBucket(guard, bucket); + } +} + +void +BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { + assert( ! _bucketsInFlight.contains(bucket)); ScanIterator itr(guard, bucket); auto [mustMove, wantReady] = needMove(itr); if (mustMove) { _buckets2Move[bucket] = wantReady; } else { _buckets2Move.erase(bucket); - cancelMovesForBucket(bucket); } - backFillMovers(); considerRun(); } @@ -271,10 +303,10 @@ BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketI considerBucket(guard, bucket); } -BucketMoveJobV2::BucketSet +BucketMoveJobV2::BucketMoveSet BucketMoveJobV2::computeBuckets2Move() { - BucketMoveJobV2::BucketSet toMove; + BucketMoveJobV2::BucketMoveSet toMove; for (ScanIterator itr(_ready.meta_store()->getBucketDB().takeGuard(), BucketId()); itr.valid(); ++itr) { auto [mustMove, wantReady] = needMove(itr); if (mustMove) { @@ -306,7 +338,8 @@ BucketMoveJobV2::greedyCreateMover() { void BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { - if (done()) return; + backFillMovers(); + if (_movers.empty()) return; // Select mover size_t index = _iterateCount++ % _movers.size(); @@ -316,13 +349,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { if (!mover->bucketDone()) { startMove(mover, maxDocsToMove); if (mover->bucketDone()) { - auto next = greedyCreateMover(); - if (next) { - _movers[index] = next; - } else { - _movers.erase(_movers.begin() + index); - } - _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed); + _movers.erase(_movers.begin() + index); } } } @@ -337,7 +364,7 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck bool BucketMoveJobV2::done() const { - return _buckets2Move.empty() && _movers.empty() && !isBlocked(); + return _buckets2Move.empty() && _movers.empty() && _postponedUntilSafe.empty() && !isBlocked(); } bool @@ -360,19 +387,22 @@ BucketMoveJobV2::run() void BucketMoveJobV2::recompute() { - _movers.clear(); _buckets2Move = computeBuckets2Move(); - backFillMovers(); + updatePending(); } void BucketMoveJobV2::backFillMovers() { // Ensure we have enough movers. while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { - _movers.push_back(greedyCreateMover()); + auto mover = greedyCreateMover(); + _movers.push_back(mover); + auto bucketId = mover->getBucket(); + _bucketsInFlight[bucketId] = std::move(mover); } - _bucketsPending.store(_movers.size() + _buckets2Move.size(), std::memory_order_relaxed); + updatePending(); } + void BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) { @@ -382,7 +412,14 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal setBlocked(BlockedReason::CLUSTER_STATE); } else { unBlock(BlockedReason::CLUSTER_STATE); - recompute(); + _movers.clear(); + std::for_each(_bucketsInFlight.begin(), _bucketsInFlight.end(), [](auto & entry) { entry.second->cancel();}); + std::erase_if(_bucketsInFlight, [](auto & entry) { return entry.second->inSync(); }); + if (_bucketsInFlight.empty()) { + recompute(); + } else { + _postponedUntilSafe.insert(RECOMPUTE_BUCKETID); + } } } @@ -415,6 +452,11 @@ BucketMoveJobV2::onStop() { } void +BucketMoveJobV2::updatePending() { + _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); +} + +void BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) { // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index a0bb8fa4d06..0f1c567b1b3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -9,6 +9,7 @@ #include "iclusterstatechangedhandler.h" #include <vespa/searchcore/proton/bucketdb/bucketscaniterator.h> #include <vespa/searchcore/proton/bucketdb/i_bucket_create_listener.h> +#include <vespa/vespalib/stllike/hash_set.h> namespace storage::spi { struct BucketExecutor; } namespace searchcorespi::index { struct IThreadService; } @@ -46,14 +47,16 @@ private: using IThreadService = searchcorespi::index::IThreadService; using BucketId = document::BucketId; using ScanIterator = bucketdb::ScanIterator; - using BucketSet = std::map<BucketId, bool>; + using BucketMoveSet = std::map<BucketId, bool>; using NeedResult = std::pair<bool, bool>; using ActiveState = storage::spi::BucketInfo::ActiveState; using BucketMover = bucketdb::BucketMover; using BucketMoverSP = std::shared_ptr<BucketMover>; - using Movers = std::vector<std::shared_ptr<BucketMover>>; + using Bucket2Mover = std::map<BucketId, BucketMoverSP>; + using Movers = std::vector<BucketMoverSP>; using MoveKey = BucketMover::MoveKey; using GuardedMoveOp = BucketMover::GuardedMoveOp; + using BucketSet = vespalib::hash_set<BucketId, BucketId::hash>; std::shared_ptr<IBucketStateCalculator> _calc; IDocumentMoveHandler &_moveHandler; IBucketModifiedHandler &_modifiedHandler; @@ -64,7 +67,10 @@ private: const document::BucketSpace _bucketSpace; size_t _iterateCount; Movers _movers; - BucketSet _buckets2Move; + Bucket2Mover _bucketsInFlight; + BucketMoveSet _buckets2Move; + BucketSet _postponedUntilSafe; + std::atomic<bool> _stopped; std::atomic<size_t> _startedCount; std::atomic<size_t> _executedCount; @@ -78,13 +84,16 @@ private: void startMove(BucketMoverSP mover, size_t maxDocsToMove); void prepareMove(BucketMoverSP mover, std::vector<MoveKey> keysToMove, IDestructorCallbackSP context); void completeMove(BucketMoverSP mover, std::vector<GuardedMoveOp> keys, IDestructorCallbackSP context); + void handleMoveResult(BucketMoverSP mover); void considerBucket(const bucketdb::Guard & guard, BucketId bucket); + void reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket); + void updatePending(); + void cancelBucket(BucketId bucket); // True if something to cancel NeedResult needMove(const ScanIterator &itr) const; - BucketSet computeBuckets2Move(); + BucketMoveSet computeBuckets2Move(); BucketMoverSP createMover(BucketId bucket, bool wantReady); BucketMoverSP greedyCreateMover(); void backFillMovers(); - void cancelMovesForBucket(BucketId bucket); void moveDocs(size_t maxDocsToMove); void failOperation(BucketId bucket); friend class StartMove; |