diff options
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp | 142 |
1 files changed, 71 insertions, 71 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index a43fd55ba02..6aa78276598 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -53,27 +53,27 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc) } -BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) +BucketMoveJob::BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), IClusterStateChangedHandler(), bucketdb::IBucketCreateListener(), IBucketStateChangedHandler(), IDiskMemUsageListener(), - std::enable_shared_from_this<BucketMoveJobV2>(), + std::enable_shared_from_this<BucketMoveJob>(), _calc(calc), _dbRetainer(std::move(dbRetainer)), _moveHandler(moveHandler), @@ -105,7 +105,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & recompute(_ready.meta_store()->getBucketDB().takeGuard()); } -BucketMoveJobV2::~BucketMoveJobV2() +BucketMoveJob::~BucketMoveJob() { _bucketCreateNotifier.removeListener(this); _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); @@ -113,35 +113,35 @@ BucketMoveJobV2::~BucketMoveJobV2() _diskMemUsageNotifier.removeDiskMemUsageListener(this); } -std::shared_ptr<BucketMoveJobV2> -BucketMoveJobV2::create(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) +std::shared_ptr<BucketMoveJob> +BucketMoveJob::create(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) { - return std::shared_ptr<BucketMoveJobV2>( - new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, - bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, - diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), + return std::shared_ptr<BucketMoveJob>( + new BucketMoveJob(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, + bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, + diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), [&master](auto job) { auto failed = master.execute(makeLambdaTask([job]() { delete job; })); assert(!failed); }); } -BucketMoveJobV2::NeedResult -BucketMoveJobV2::needMove(const ScanIterator &itr) const { +BucketMoveJob::NeedResult +BucketMoveJob::needMove(const ScanIterator &itr) const { NeedResult noMove(false, false); const bool hasReadyDocs = itr.hasReadyBucketDocs(); const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); @@ -176,10 +176,10 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const { return {true, wantReady}; } -class BucketMoveJobV2::StartMove : public storage::spi::BucketTask { +class BucketMoveJob::StartMove : public storage::spi::BucketTask { public: using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>; - StartMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) + StartMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) : _job(job), _keys(std::move(keys)), _opsTracker(std::move(opsTracker)) @@ -188,22 +188,22 @@ public: void run(const Bucket &bucket, IDestructorCallbackSP onDone) override { assert(_keys.mover().getBucket() == bucket.getBucketId()); using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>; - BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys), - std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + BucketMoveJob::prepareMove(std::move(_job), std::move(_keys), + std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); } void fail(const Bucket &bucket) override { - BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId()); + BucketMoveJob::failOperation(std::move(_job), bucket.getBucketId()); } private: - std::shared_ptr<BucketMoveJobV2> _job; + std::shared_ptr<BucketMoveJob> _job; BucketMover::MoveKeys _keys; IDestructorCallbackSP _opsTracker; }; void -BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) { +BucketMoveJob::failOperation(std::shared_ptr<BucketMoveJob> job, BucketId bucketId) { auto & master = job->_master; if (job->stopped()) return; master.execute(makeLambdaTask([job=std::move(job), bucketId]() { @@ -213,7 +213,7 @@ BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bu } void -BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { +BucketMoveJob::startMove(BucketMover & mover, size_t maxDocsToMove) { auto [keys, done] = mover.getKeysToMove(maxDocsToMove); if (done) { mover.setAllScheduled(); @@ -226,7 +226,7 @@ BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { } void -BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) +BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) { if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. auto moveOps = keys.createMoveOperations(); @@ -239,7 +239,7 @@ BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover:: } void -BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { +BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { BucketMover & mover = ops.mover(); mover.moveDocuments(std::move(ops.success()), std::move(onDone)); ops.failed().clear(); @@ -249,7 +249,7 @@ BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) } bool -BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { +BucketMoveJob::checkIfMoverComplete(const BucketMover & mover) { bool bucketMoveComplete = mover.allScheduled() && mover.inSync(); bool needReschedule = mover.needReschedule(); if (bucketMoveComplete || needReschedule) { @@ -277,7 +277,7 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { } void -BucketMoveJobV2::cancelBucket(BucketId bucket) { +BucketMoveJob::cancelBucket(BucketId bucket) { auto inFlight = _bucketsInFlight.find(bucket); if (inFlight != _bucketsInFlight.end()) { inFlight->second->cancel(); @@ -286,14 +286,14 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) { } void -BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { +BucketMoveJob::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { cancelBucket(bucket); assert( !_bucketsInFlight.contains(bucket)); reconsiderBucket(guard, bucket); } void -BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { +BucketMoveJob::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { assert( ! _bucketsInFlight.contains(bucket)); ScanIterator itr(guard, bucket); auto [mustMove, wantReady] = needMove(itr); @@ -307,15 +307,15 @@ BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket } void -BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) +BucketMoveJob::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) { considerBucket(guard, bucket); } -BucketMoveJobV2::BucketMoveSet -BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) +BucketMoveJob::BucketMoveSet +BucketMoveJob::computeBuckets2Move(const bucketdb::Guard & guard) { - BucketMoveJobV2::BucketMoveSet toMove; + BucketMoveJob::BucketMoveSet toMove; for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) { auto [mustMove, wantReady] = needMove(itr); if (mustMove) { @@ -326,7 +326,7 @@ BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) } std::shared_ptr<BucketMover> -BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { +BucketMoveJob::createMover(BucketId bucket, bool wantReady) { const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", @@ -335,7 +335,7 @@ BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { } std::shared_ptr<BucketMover> -BucketMoveJobV2::greedyCreateMover() { +BucketMoveJob::greedyCreateMover() { if ( ! _buckets2Move.empty()) { auto next = _buckets2Move.begin(); auto mover = createMover(next->first, next->second); @@ -346,7 +346,7 @@ BucketMoveJobV2::greedyCreateMover() { } void -BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { +BucketMoveJob::moveDocs(size_t maxDocsToMove) { backFillMovers(); if (_movers.empty()) return; @@ -364,7 +364,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { } bool -BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { +BucketMoveJob::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { for (size_t i(0); i < maxBuckets2Move; i++) { moveDocs(maxDocsToMovePerBucket); } @@ -372,12 +372,12 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck } bool -BucketMoveJobV2::done() const { +BucketMoveJob::done() const { return _buckets2Move.empty() && _movers.empty() && !isBlocked(); } bool -BucketMoveJobV2::run() +BucketMoveJob::run() { if (isBlocked()) { return true; // indicate work is done, since node state is bad @@ -395,17 +395,17 @@ BucketMoveJobV2::run() } void -BucketMoveJobV2::recompute() { +BucketMoveJob::recompute() { recompute(_ready.meta_store()->getBucketDB().takeGuard()); } void -BucketMoveJobV2::recompute(const bucketdb::Guard & guard) { +BucketMoveJob::recompute(const bucketdb::Guard & guard) { _buckets2Move = computeBuckets2Move(guard); updatePending(); } void -BucketMoveJobV2::backFillMovers() { +BucketMoveJob::backFillMovers() { // Ensure we have enough movers. while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { auto mover = greedyCreateMover(); @@ -418,7 +418,7 @@ BucketMoveJobV2::backFillMovers() { } void -BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) +BucketMoveJob::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) { // Called by master write thread _calc = newCalc; @@ -434,26 +434,26 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal } void -BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) +BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) { // Called by master write thread considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); } void -BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) +BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state) { // Called by master write thread internalNotifyDiskMemUsage(state); } void -BucketMoveJobV2::updatePending() { +BucketMoveJob::updatePending() { _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); } void -BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const { +BucketMoveJob::updateMetrics(DocumentDBTaggedMetrics & metrics) const { // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + getLimiter().numPending()); |