diff options
Diffstat (limited to 'searchcore')
11 files changed, 125 insertions, 149 deletions
diff --git a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp index 609f2413bdc..64398503dfa 100644 --- a/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentbucketmover/documentbucketmover_test.cpp @@ -43,7 +43,7 @@ struct ControllerFixtureBase : public ::testing::Test DummyBucketExecutor _bucketExecutor; MyMoveHandler _moveHandler; DocumentDBTaggedMetrics _metrics; - std::shared_ptr<BucketMoveJobV2> _bmj; + std::shared_ptr<BucketMoveJob> _bmj; MyCountJobRunner _runner; ControllerFixtureBase(const BlockableMaintenanceJobConfig &blockableConfig, bool storeMoveDoneContexts); ~ControllerFixtureBase(); @@ -125,9 +125,9 @@ ControllerFixtureBase::ControllerFixtureBase(const BlockableMaintenanceJobConfig _bucketExecutor(4), _moveHandler(*_bucketDB, storeMoveDoneContexts), _metrics("test", 1), - _bmj(BucketMoveJobV2::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, - _notReady._subDb, _bucketCreateNotifier,_clusterStateHandler, _bucketHandler, - _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), + _bmj(BucketMoveJob::create(_calc, RetainGuard(_refCount), _moveHandler, _modifiedHandler, _master, _bucketExecutor, _ready._subDb, + _notReady._subDb, _bucketCreateNotifier, _clusterStateHandler, _bucketHandler, + _diskMemUsageNotifier, blockableConfig, "test", makeBucketSpace())), _runner(*_bmj) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp index a43fd55ba02..6aa78276598 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.cpp @@ -53,27 +53,27 @@ blockedDueToClusterState(const std::shared_ptr<IBucketStateCalculator> &calc) } -BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) +BucketMoveJob::BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) : BlockableMaintenanceJob("move_buckets." + docTypeName, vespalib::duration::zero(), vespalib::duration::zero(), blockableConfig), IClusterStateChangedHandler(), bucketdb::IBucketCreateListener(), IBucketStateChangedHandler(), IDiskMemUsageListener(), - std::enable_shared_from_this<BucketMoveJobV2>(), + std::enable_shared_from_this<BucketMoveJob>(), _calc(calc), _dbRetainer(std::move(dbRetainer)), _moveHandler(moveHandler), @@ -105,7 +105,7 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & recompute(_ready.meta_store()->getBucketDB().takeGuard()); } -BucketMoveJobV2::~BucketMoveJobV2() +BucketMoveJob::~BucketMoveJob() { _bucketCreateNotifier.removeListener(this); _clusterStateChangedNotifier.removeClusterStateChangedHandler(this); @@ -113,35 +113,35 @@ BucketMoveJobV2::~BucketMoveJobV2() _diskMemUsageNotifier.removeDiskMemUsageListener(this); } -std::shared_ptr<BucketMoveJobV2> -BucketMoveJobV2::create(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace) +std::shared_ptr<BucketMoveJob> +BucketMoveJob::create(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace) { - return std::shared_ptr<BucketMoveJobV2>( - new BucketMoveJobV2(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, - bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, - diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), + return std::shared_ptr<BucketMoveJob>( + new BucketMoveJob(calc, std::move(dbRetainer), moveHandler, modifiedHandler, master, bucketExecutor, ready, notReady, + bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, + diskMemUsageNotifier, blockableConfig, docTypeName, bucketSpace), [&master](auto job) { auto failed = master.execute(makeLambdaTask([job]() { delete job; })); assert(!failed); }); } -BucketMoveJobV2::NeedResult -BucketMoveJobV2::needMove(const ScanIterator &itr) const { +BucketMoveJob::NeedResult +BucketMoveJob::needMove(const ScanIterator &itr) const { NeedResult noMove(false, false); const bool hasReadyDocs = itr.hasReadyBucketDocs(); const bool hasNotReadyDocs = itr.hasNotReadyBucketDocs(); @@ -176,10 +176,10 @@ BucketMoveJobV2::needMove(const ScanIterator &itr) const { return {true, wantReady}; } -class BucketMoveJobV2::StartMove : public storage::spi::BucketTask { +class BucketMoveJob::StartMove : public storage::spi::BucketTask { public: using IDestructorCallbackSP = std::shared_ptr<vespalib::IDestructorCallback>; - StartMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) + StartMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP opsTracker) : _job(job), _keys(std::move(keys)), _opsTracker(std::move(opsTracker)) @@ -188,22 +188,22 @@ public: void run(const Bucket &bucket, IDestructorCallbackSP onDone) override { assert(_keys.mover().getBucket() == bucket.getBucketId()); using DoneContext = vespalib::KeepAlive<std::pair<IDestructorCallbackSP, IDestructorCallbackSP>>; - BucketMoveJobV2::prepareMove(std::move(_job), std::move(_keys), - std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); + BucketMoveJob::prepareMove(std::move(_job), std::move(_keys), + std::make_shared<DoneContext>(std::make_pair(std::move(_opsTracker), std::move(onDone)))); } void fail(const Bucket &bucket) override { - BucketMoveJobV2::failOperation(std::move(_job), bucket.getBucketId()); + BucketMoveJob::failOperation(std::move(_job), bucket.getBucketId()); } private: - std::shared_ptr<BucketMoveJobV2> _job; + std::shared_ptr<BucketMoveJob> _job; BucketMover::MoveKeys _keys; IDestructorCallbackSP _opsTracker; }; void -BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) { +BucketMoveJob::failOperation(std::shared_ptr<BucketMoveJob> job, BucketId bucketId) { auto & master = job->_master; if (job->stopped()) return; master.execute(makeLambdaTask([job=std::move(job), bucketId]() { @@ -213,7 +213,7 @@ BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bu } void -BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { +BucketMoveJob::startMove(BucketMover & mover, size_t maxDocsToMove) { auto [keys, done] = mover.getKeysToMove(maxDocsToMove); if (done) { mover.setAllScheduled(); @@ -226,7 +226,7 @@ BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { } void -BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) +BucketMoveJob::prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) { if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. auto moveOps = keys.createMoveOperations(); @@ -239,7 +239,7 @@ BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover:: } void -BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { +BucketMoveJob::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) { BucketMover & mover = ops.mover(); mover.moveDocuments(std::move(ops.success()), std::move(onDone)); ops.failed().clear(); @@ -249,7 +249,7 @@ BucketMoveJobV2::completeMove(GuardedMoveOps ops, IDestructorCallbackSP onDone) } bool -BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { +BucketMoveJob::checkIfMoverComplete(const BucketMover & mover) { bool bucketMoveComplete = mover.allScheduled() && mover.inSync(); bool needReschedule = mover.needReschedule(); if (bucketMoveComplete || needReschedule) { @@ -277,7 +277,7 @@ BucketMoveJobV2::checkIfMoverComplete(const BucketMover & mover) { } void -BucketMoveJobV2::cancelBucket(BucketId bucket) { +BucketMoveJob::cancelBucket(BucketId bucket) { auto inFlight = _bucketsInFlight.find(bucket); if (inFlight != _bucketsInFlight.end()) { inFlight->second->cancel(); @@ -286,14 +286,14 @@ BucketMoveJobV2::cancelBucket(BucketId bucket) { } void -BucketMoveJobV2::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { +BucketMoveJob::considerBucket(const bucketdb::Guard & guard, BucketId bucket) { cancelBucket(bucket); assert( !_bucketsInFlight.contains(bucket)); reconsiderBucket(guard, bucket); } void -BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { +BucketMoveJob::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket) { assert( ! _bucketsInFlight.contains(bucket)); ScanIterator itr(guard, bucket); auto [mustMove, wantReady] = needMove(itr); @@ -307,15 +307,15 @@ BucketMoveJobV2::reconsiderBucket(const bucketdb::Guard & guard, BucketId bucket } void -BucketMoveJobV2::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) +BucketMoveJob::notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) { considerBucket(guard, bucket); } -BucketMoveJobV2::BucketMoveSet -BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) +BucketMoveJob::BucketMoveSet +BucketMoveJob::computeBuckets2Move(const bucketdb::Guard & guard) { - BucketMoveJobV2::BucketMoveSet toMove; + BucketMoveJob::BucketMoveSet toMove; for (ScanIterator itr(guard, BucketId()); itr.valid(); ++itr) { auto [mustMove, wantReady] = needMove(itr); if (mustMove) { @@ -326,7 +326,7 @@ BucketMoveJobV2::computeBuckets2Move(const bucketdb::Guard & guard) } std::shared_ptr<BucketMover> -BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { +BucketMoveJob::createMover(BucketId bucket, bool wantReady) { const MaintenanceDocumentSubDB &source(wantReady ? _notReady : _ready); const MaintenanceDocumentSubDB &target(wantReady ? _ready : _notReady); LOG(debug, "checkBucket(): mover.setupForBucket(%s, source:%u, target:%u)", @@ -335,7 +335,7 @@ BucketMoveJobV2::createMover(BucketId bucket, bool wantReady) { } std::shared_ptr<BucketMover> -BucketMoveJobV2::greedyCreateMover() { +BucketMoveJob::greedyCreateMover() { if ( ! _buckets2Move.empty()) { auto next = _buckets2Move.begin(); auto mover = createMover(next->first, next->second); @@ -346,7 +346,7 @@ BucketMoveJobV2::greedyCreateMover() { } void -BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { +BucketMoveJob::moveDocs(size_t maxDocsToMove) { backFillMovers(); if (_movers.empty()) return; @@ -364,7 +364,7 @@ BucketMoveJobV2::moveDocs(size_t maxDocsToMove) { } bool -BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { +BucketMoveJob::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket) { for (size_t i(0); i < maxBuckets2Move; i++) { moveDocs(maxDocsToMovePerBucket); } @@ -372,12 +372,12 @@ BucketMoveJobV2::scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBuck } bool -BucketMoveJobV2::done() const { +BucketMoveJob::done() const { return _buckets2Move.empty() && _movers.empty() && !isBlocked(); } bool -BucketMoveJobV2::run() +BucketMoveJob::run() { if (isBlocked()) { return true; // indicate work is done, since node state is bad @@ -395,17 +395,17 @@ BucketMoveJobV2::run() } void -BucketMoveJobV2::recompute() { +BucketMoveJob::recompute() { recompute(_ready.meta_store()->getBucketDB().takeGuard()); } void -BucketMoveJobV2::recompute(const bucketdb::Guard & guard) { +BucketMoveJob::recompute(const bucketdb::Guard & guard) { _buckets2Move = computeBuckets2Move(guard); updatePending(); } void -BucketMoveJobV2::backFillMovers() { +BucketMoveJob::backFillMovers() { // Ensure we have enough movers. while ( ! _buckets2Move.empty() && (_movers.size() < _movers.capacity())) { auto mover = greedyCreateMover(); @@ -418,7 +418,7 @@ BucketMoveJobV2::backFillMovers() { } void -BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) +BucketMoveJob::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCalculator> &newCalc) { // Called by master write thread _calc = newCalc; @@ -434,26 +434,26 @@ BucketMoveJobV2::notifyClusterStateChanged(const std::shared_ptr<IBucketStateCal } void -BucketMoveJobV2::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) +BucketMoveJob::notifyBucketStateChanged(const BucketId &bucketId, BucketInfo::ActiveState) { // Called by master write thread considerBucket(_ready.meta_store()->getBucketDB().takeGuard(), bucketId); } void -BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) +BucketMoveJob::notifyDiskMemUsage(DiskMemUsageState state) { // Called by master write thread internalNotifyDiskMemUsage(state); } void -BucketMoveJobV2::updatePending() { +BucketMoveJob::updatePending() { _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); } void -BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const { +BucketMoveJob::updateMetrics(DocumentDBTaggedMetrics & metrics) const { // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + getLimiter().numPending()); diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h index 7d5dafb33b3..573bdef13ef 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejob.h @@ -36,12 +36,12 @@ namespace bucketdb { class IBucketCreateNotifier; } * 3 - Actual movement is then done in master thread while still holding bucket lock. Once bucket has fully moved * bucket modified notification is sent. */ -class BucketMoveJobV2 : public BlockableMaintenanceJob, +class BucketMoveJob : public BlockableMaintenanceJob, public IClusterStateChangedHandler, public bucketdb::IBucketCreateListener, public IBucketStateChangedHandler, public IDiskMemUsageListener, - public std::enable_shared_from_this<BucketMoveJobV2> + public std::enable_shared_from_this<BucketMoveJob> { private: using BucketExecutor = storage::spi::BucketExecutor; @@ -79,24 +79,24 @@ private: IBucketStateChangedNotifier &_bucketStateChangedNotifier; IDiskMemUsageNotifier &_diskMemUsageNotifier; - BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> &calc, - RetainGuard dbRetainer, - IDocumentMoveHandler &moveHandler, - IBucketModifiedHandler &modifiedHandler, - IThreadService & master, - BucketExecutor & bucketExecutor, - const MaintenanceDocumentSubDB &ready, - const MaintenanceDocumentSubDB ¬Ready, - bucketdb::IBucketCreateNotifier &bucketCreateNotifier, - IClusterStateChangedNotifier &clusterStateChangedNotifier, - IBucketStateChangedNotifier &bucketStateChangedNotifier, - IDiskMemUsageNotifier &diskMemUsageNotifier, - const BlockableMaintenanceJobConfig &blockableConfig, - const vespalib::string &docTypeName, - document::BucketSpace bucketSpace); + BucketMoveJob(const std::shared_ptr<IBucketStateCalculator> &calc, + RetainGuard dbRetainer, + IDocumentMoveHandler &moveHandler, + IBucketModifiedHandler &modifiedHandler, + IThreadService & master, + BucketExecutor & bucketExecutor, + const MaintenanceDocumentSubDB &ready, + const MaintenanceDocumentSubDB ¬Ready, + bucketdb::IBucketCreateNotifier &bucketCreateNotifier, + IClusterStateChangedNotifier &clusterStateChangedNotifier, + IBucketStateChangedNotifier &bucketStateChangedNotifier, + IDiskMemUsageNotifier &diskMemUsageNotifier, + const BlockableMaintenanceJobConfig &blockableConfig, + const vespalib::string &docTypeName, + document::BucketSpace bucketSpace); void startMove(BucketMover & mover, size_t maxDocsToMove); - static void prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP context); + static void prepareMove(std::shared_ptr<BucketMoveJob> job, BucketMover::MoveKeys keys, IDestructorCallbackSP context); void completeMove(GuardedMoveOps moveOps, IDestructorCallbackSP context); bool checkIfMoverComplete(const BucketMover & mover); void considerBucket(const bucketdb::Guard & guard, BucketId bucket); @@ -109,11 +109,11 @@ private: BucketMoverSP greedyCreateMover(); void backFillMovers(); void moveDocs(size_t maxDocsToMove); - static void failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucket); + static void failOperation(std::shared_ptr<BucketMoveJob> job, BucketId bucket); void recompute(const bucketdb::Guard & guard); class StartMove; public: - static std::shared_ptr<BucketMoveJobV2> + static std::shared_ptr<BucketMoveJob> create(const std::shared_ptr<IBucketStateCalculator> &calc, RetainGuard dbRetainer, IDocumentMoveHandler &moveHandler, @@ -130,7 +130,7 @@ public: const vespalib::string &docTypeName, document::BucketSpace bucketSpace); - ~BucketMoveJobV2() override; + ~BucketMoveJob() override; bool scanAndMove(size_t maxBuckets2Move, size_t maxDocsToMovePerBucket); bool done() const; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp index d0581a5b13f..a0ce668294f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentbucketmover.cpp @@ -134,7 +134,7 @@ BucketMover::createMoveOperations(MoveKeys toMove) { void BucketMover::moveDocuments(std::vector<GuardedMoveOp> moveOps, IDestructorCallbackSP onDone) { for (auto & moveOp : moveOps) { - moveDocument(std::move(moveOp.first), std::move(onDone)); + moveDocument(std::move(moveOp.first), onDone); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp index 215d4721b87..cfe8da18270 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenance_jobs_injector.cpp @@ -59,10 +59,10 @@ injectBucketMoveJob(MaintenanceController &controller, DocumentDBJobTrackers &jobTrackers, IDiskMemUsageNotifier &diskMemUsageNotifier) { - auto bmj = BucketMoveJobV2::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(), - bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(), - bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, - diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace); + auto bmj = BucketMoveJob::create(calc, controller.retainDB(), moveHandler, bucketModifiedHandler, controller.masterThread(), + bucketExecutor, controller.getReadySubDB(), controller.getNotReadySubDB(), + bucketCreateNotifier, clusterStateChangedNotifier, bucketStateChangedNotifier, + diskMemUsageNotifier, config.getBlockableJobConfig(), docTypeName, bucketSpace); controller.registerJobInMasterThread(trackJob(jobTrackers.getBucketMove(), std::move(bmj))); } diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h index e5666daffb6..16b6a320b66 100644 --- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h @@ -17,10 +17,7 @@ namespace proton { class OperationDoneContext : public vespalib::IDestructorCallback { FeedToken _token; -protected: void ack(); - FeedToken steal() { return std::move(_token); } - public: OperationDoneContext(FeedToken token); @@ -28,5 +25,4 @@ public: bool hasToken() const { return static_cast<bool>(_token); } }; - } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp index 859d8693f6d..b73b3c02120 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp @@ -22,7 +22,6 @@ RemoveDoneContext::RemoveDoneContext(FeedToken token, IPendingLidTracker::Token RemoveDoneContext::~RemoveDoneContext() { - ack(); if (_task) { vespalib::Executor::Task::UP res = _executor.execute(std::move(_task)); assert(!res); diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h index 485b82dd141..5eea6f7faaf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h @@ -14,7 +14,7 @@ struct IDocumentMetaStore; /** - * Context class for document removes that acks remove andschedules a + * Context class for document removes that acks remove and schedules a * task when instance is destroyed. Typically a shared pointer to an * instance is passed around to multiple worker threads that performs * portions of a larger task before dropping the shared pointer, diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp index c66a5d949e6..49a7490b263 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/removedonetask.cpp @@ -5,17 +5,14 @@ namespace proton { -RemoveDoneTask::RemoveDoneTask(IDocumentMetaStore &documentMetaStore, - uint32_t lid) +RemoveDoneTask::RemoveDoneTask(IDocumentMetaStore &documentMetaStore, uint32_t lid) : vespalib::Executor::Task(), _documentMetaStore(documentMetaStore), _lid(lid) { } -RemoveDoneTask::~RemoveDoneTask() -{ -} +RemoveDoneTask::~RemoveDoneTask() = default; void RemoveDoneTask::run() diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonetask.h b/searchcore/src/vespa/searchcore/proton/server/removedonetask.h index d9059e6ad6a..1aa59c6d37f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/removedonetask.h +++ b/searchcore/src/vespa/searchcore/proton/server/removedonetask.h @@ -29,12 +29,10 @@ class RemoveDoneTask : public vespalib::Executor::Task public: - RemoveDoneTask(IDocumentMetaStore &documentMetaStore, - uint32_t lid); + RemoveDoneTask(IDocumentMetaStore &documentMetaStore, uint32_t lid); + ~RemoveDoneTask() override; - virtual ~RemoveDoneTask(); - - virtual void run() override; + void run() override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index 6066fef68d8..df9ff54e29e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -49,9 +49,7 @@ private: public: PutDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, - std::shared_ptr<const Document> doc, - uint32_t lid, - IDestructorCallback::SP moveDoneCtx) + std::shared_ptr<const Document> doc, uint32_t lid, IDestructorCallback::SP moveDoneCtx) : PutDoneContext(std::move(token), std::move(uncommitted),std::move(doc), lid), _moveDoneCtx(std::move(moveDoneCtx)) {} @@ -60,25 +58,21 @@ public: std::shared_ptr<PutDoneContext> createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, - std::shared_ptr<const Document> doc, - uint32_t lid, - IDestructorCallback::SP moveDoneCtx) + std::shared_ptr<const Document> doc, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { std::shared_ptr<PutDoneContext> result; if (moveDoneCtx) { result = std::make_shared<PutDoneContextForMove>(std::move(token), std::move(uncommitted), std::move(doc), lid, std::move(moveDoneCtx)); } else { - result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), - std::move(doc), lid); + result = std::make_shared<PutDoneContext>(std::move(token), std::move(uncommitted), std::move(doc), lid); } return result; } std::shared_ptr<PutDoneContext> createPutDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, - std::shared_ptr<const Document> doc, - uint32_t lid) + std::shared_ptr<const Document> doc, uint32_t lid) { return createPutDoneContext(std::move(token), std::move(uncommitted), std::move(doc), lid, IDestructorCallback::SP()); @@ -106,10 +100,8 @@ private: public: RemoveDoneContextForMove(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - uint32_t lid, IDestructorCallback::SP moveDoneCtx) - : RemoveDoneContext(std::move(token), std::move(uncommitted), executor, - documentMetaStore, lid), + IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx) + : RemoveDoneContext(std::move(token), std::move(uncommitted), executor, documentMetaStore, lid), _moveDoneCtx(std::move(moveDoneCtx)) {} ~RemoveDoneContextForMove() override = default; @@ -117,13 +109,11 @@ public: std::shared_ptr<RemoveDoneContext> createRemoveDoneContext(FeedToken token, IPendingLidTracker::Token uncommitted, vespalib::Executor &executor, - IDocumentMetaStore &documentMetaStore, - uint32_t lid, IDestructorCallback::SP moveDoneCtx) + IDocumentMetaStore &documentMetaStore, uint32_t lid, IDestructorCallback::SP moveDoneCtx) { if (moveDoneCtx) { return std::make_shared<RemoveDoneContextForMove> - (std::move(token), std::move(uncommitted), executor, documentMetaStore, - lid, std::move(moveDoneCtx)); + (std::move(token), std::move(uncommitted), executor, documentMetaStore, lid, std::move(moveDoneCtx)); } else { return std::make_shared<RemoveDoneContext> (std::move(token), std::move(uncommitted), executor, documentMetaStore, lid); @@ -163,8 +153,8 @@ void putMetaData(documentmetastore::IStore &meta_store, const DocumentId & doc_i const DocumentOperation &op, bool is_removed_doc) { documentmetastore::IStore::Result putRes( - meta_store.put(doc_id.getGlobalId(), - op.getBucketId(), op.getTimestamp(), op.getSerializedDocSize(), op.getLid(), op.get_prepare_serial_num())); + meta_store.put(doc_id.getGlobalId(), op.getBucketId(), op.getTimestamp(), + op.getSerializedDocSize(), op.getLid(), op.get_prepare_serial_num())); if (!putRes.ok()) { throw IllegalStateException( make_string("Could not put <lid, gid> pair for %sdocument with id '%s' and gid '%s'", @@ -312,14 +302,13 @@ StoreOnlyFeedView::internalPut(FeedToken token, const PutOperation &putOp) if (putOp.changedDbdId() && useDocumentMetaStore(serialNum)) { _gidToLidChangeHandler.notifyPut(token, docId.getGlobalId(), putOp.getLid(), serialNum); } - std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(std::move(token), std::move(uncommitted), - doc, putOp.getLid()); + auto onWriteDone = createPutDoneContext(std::move(token), std::move(uncommitted), doc, putOp.getLid()); putSummary(serialNum, putOp.getLid(), doc, onWriteDone); putAttributes(serialNum, putOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, putOp.getLid(), doc, onWriteDone); } if (docAlreadyExists && putOp.changedDbdId()) { + //TODO, better to have an else than an assert ? assert(!putOp.getValidDbdId(_params._subDbId)); internalRemove(std::move(token), _pendingLidsForCommit->produce(putOp.getPrevLid()), serialNum, putOp.getPrevLid(), IDestructorCallback::SP()); @@ -602,6 +591,7 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithDocI } if (rmOp.getValidPrevDbdId(_params._subDbId)) { if (rmOp.changedDbdId()) { + //TODO Prefer else over assert ? assert(!rmOp.getValidDbdId(_params._subDbId)); internalRemove(std::move(token), _pendingLidsForCommit->produce(rmOp.getPrevLid()), serialNum, rmOp.getPrevLid(), IDestructorCallback::SP()); @@ -630,14 +620,11 @@ StoreOnlyFeedView::internalRemove(FeedToken token, const RemoveOperationWithGid void StoreOnlyFeedView::internalRemove(FeedToken token, IPendingLidTracker::Token uncommitted, SerialNum serialNum, - Lid lid, - IDestructorCallback::SP moveDoneCtx) + Lid lid, IDestructorCallback::SP moveDoneCtx) { bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid); - std::shared_ptr<RemoveDoneContext> onWriteDone; - onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted),_writeService.master(), _metaStore, - (explicitReuseLid ? lid : 0u), - std::move(moveDoneCtx)); + auto onWriteDone = createRemoveDoneContext(std::move(token), std::move(uncommitted), _writeService.master(), _metaStore, + (explicitReuseLid ? lid : 0u), std::move(moveDoneCtx)); removeSummary(serialNum, lid, onWriteDone); removeAttributes(serialNum, lid, onWriteDone); removeIndexedFields(serialNum, lid, onWriteDone); @@ -786,9 +773,8 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback:: if (moveOp.changedDbdId() && useDocumentMetaStore(serialNum)) { _gidToLidChangeHandler.notifyPut(FeedToken(), docId.getGlobalId(), moveOp.getLid(), serialNum); } - std::shared_ptr<PutDoneContext> onWriteDone = - createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), - doc, moveOp.getLid(), doneCtx); + auto onWriteDone = createPutDoneContext(FeedToken(), _pendingLidsForCommit->produce(moveOp.getLid()), + doc, moveOp.getLid(), doneCtx); putSummary(serialNum, moveOp.getLid(), doc, onWriteDone); putAttributes(serialNum, moveOp.getLid(), *doc, onWriteDone); putIndexedFields(serialNum, moveOp.getLid(), doc, onWriteDone); |