diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-05-05 10:35:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-05 10:35:12 +0200 |
commit | 89fd8d582f82283c4cdbd71b6f07851df816a698 (patch) | |
tree | ba70546130c79ec6c80206dfb72935f9bdf49dfc /searchcore | |
parent | 02a7400a60b05d4d8db5c77def32d681199e0fb8 (diff) | |
parent | e54133b4b3deda0afa56f0dead5f23c8f314f93c (diff) |
Merge pull request #17739 from vespa-engine/balder/unify-stopping-of-jobs
Make handle stop in a uniform way by putting the basics in the base c…
Diffstat (limited to 'searchcore')
12 files changed, 38 insertions, 70 deletions
diff --git a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp index 4287a6b262a..8862a6d3bb2 100644 --- a/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp +++ b/searchcore/src/tests/proton/documentdb/job_tracked_maintenance_job/job_tracked_maintenance_job_test.cpp @@ -34,13 +34,11 @@ struct MyMaintenanceJob : public IBlockableMaintenanceJob GateVector _runGates; size_t _runIdx; bool _blocked; - bool _stopped; MyMaintenanceJob(size_t numRuns) : IBlockableMaintenanceJob("myjob", 10s, 20s), _runGates(getGateVector(numRuns)), _runIdx(0), - _blocked(false), - _stopped(false) + _blocked(false) {} void block() { setBlocked(BlockedReason::RESOURCE_LIMITS); } void unBlock() { unBlock(BlockedReason::RESOURCE_LIMITS); } @@ -51,7 +49,7 @@ struct MyMaintenanceJob : public IBlockableMaintenanceJob _runGates[_runIdx++]->await(5s); return _runIdx == _runGates.size(); } - void onStop() override { _stopped = true; } + void onStop() override { } }; struct Fixture @@ -65,10 +63,10 @@ struct Fixture size_t _runIdx; ThreadStackExecutor _exec; Fixture(size_t numRuns = 1) - : _tracker(new SimpleJobTracker(1)), - _job(new MyMaintenanceJob(numRuns)), + : _tracker(std::make_shared<SimpleJobTracker>(1)), + _job(std::make_unique<MyMaintenanceJob>(numRuns)), _myJob(static_cast<MyMaintenanceJob *>(_job.get())), - _trackedJob(new JobTrackedMaintenanceJob(_tracker, std::move(_job))), + _trackedJob(std::make_unique<JobTrackedMaintenanceJob>(_tracker, std::move(_job))), _runRetval(false), _runGates(getGateVector(numRuns)), _runIdx(0), @@ -144,9 +142,9 @@ TEST_F("require that block calls are sent to underlying jobs", Fixture) TEST_F("require that stop calls are sent to underlying jobs", Fixture) { - EXPECT_FALSE(f._myJob->_stopped); - f._trackedJob->onStop(); - EXPECT_TRUE(f._myJob->_stopped); + EXPECT_FALSE(f._myJob->stopped()); + f._trackedJob->stop(); + EXPECT_TRUE(f._myJob->stopped()); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 49236a829f5..e73fa739fbb 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -290,17 +290,14 @@ struct MySimpleJob : public BlockableMaintenanceJob { vespalib::CountDownLatch _latch; size_t _runCnt; - bool _stopped; MySimpleJob(vespalib::duration delay, vespalib::duration interval, uint32_t finishCount) : BlockableMaintenanceJob("my_job", delay, interval), _latch(finishCount), - _runCnt(0), - _stopped(false) - { - } + _runCnt(0) + { } void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } bool run() override { LOG(info, "MySimpleJob::run()"); @@ -308,10 +305,6 @@ struct MySimpleJob : public BlockableMaintenanceJob ++_runCnt; return true; } - void onStop() override { - BlockableMaintenanceJob::onStop(); - _stopped = true; - } }; struct MySplitJob : public MySimpleJob @@ -333,13 +326,11 @@ struct MySplitJob : public MySimpleJob struct MyLongRunningJob : public BlockableMaintenanceJob { vespalib::Gate _firstRun; - bool _stopped; MyLongRunningJob(vespalib::duration delay, vespalib::duration interval) : BlockableMaintenanceJob("long_running_job", delay, interval), - _firstRun(), - _stopped(false) + _firstRun() { } void block() { setBlocked(BlockedReason::FROZEN_BUCKET); } @@ -348,10 +339,6 @@ struct MyLongRunningJob : public BlockableMaintenanceJob usleep(10000); return false; } - void onStop() override { - BlockableMaintenanceJob::onStop(); - _stopped = true; - } }; using MyAttributeManager = test::MockAttributeManager; diff --git a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp index 1f152f4a257..8466a016fc3 100644 --- a/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp +++ b/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp @@ -12,11 +12,9 @@ using namespace proton; struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob { bool blocked; - bool stopped; MyBlockableMaintenanceJob() : IBlockableMaintenanceJob("my_job", 1s, 1s), - blocked(false), - stopped(false) + blocked(false) {} void setBlocked(BlockedReason reason) override { ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS); @@ -29,7 +27,7 @@ struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob { blocked = false; } bool run() override { return true; } - void onStop() override { stopped = true; } + void onStop() override { } }; struct Fixture { diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index 9e34462ae21..f2c6dc3a8be 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -87,7 +87,6 @@ BucketMoveJobV2::BucketMoveJobV2(const std::shared_ptr<IBucketStateCalculator> & _movers(), _bucketsInFlight(), _buckets2Move(), - _stopped(false), _bucketsPending(0), _bucketCreateNotifier(bucketCreateNotifier), _clusterStateChangedNotifier(clusterStateChangedNotifier), @@ -206,9 +205,9 @@ private: void BucketMoveJobV2::failOperation(std::shared_ptr<BucketMoveJobV2> job, BucketId bucketId) { auto & master = job->_master; - if (job->_stopped) return; + if (job->stopped()) return; master.execute(makeLambdaTask([job=std::move(job), bucketId]() { - if (job->_stopped.load(std::memory_order_relaxed)) return; + if (job->stopped()) return; job->considerBucket(job->_ready.meta_store()->getBucketDB().takeGuard(), bucketId); })); } @@ -229,12 +228,12 @@ BucketMoveJobV2::startMove(BucketMover & mover, size_t maxDocsToMove) { void BucketMoveJobV2::prepareMove(std::shared_ptr<BucketMoveJobV2> job, BucketMover::MoveKeys keys, IDestructorCallbackSP onDone) { - if (job->_stopped) return; //TODO Remove once lidtracker is no longer in use. + if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. auto moveOps = keys.createMoveOperations(); auto & master = job->_master; - if (job->_stopped) return; + if (job->stopped()) return; master.execute(makeLambdaTask([job=std::move(job), moveOps=std::move(moveOps), onDone=std::move(onDone)]() mutable { - if (job->_stopped.load(std::memory_order_relaxed)) return; + if (job->stopped()) return; job->completeMove(std::move(moveOps), std::move(onDone)); })); } @@ -449,13 +448,6 @@ BucketMoveJobV2::notifyDiskMemUsage(DiskMemUsageState state) } void -BucketMoveJobV2::onStop() { - // Called by master write thread - BlockableMaintenanceJob::onStop(); - _stopped = true; -} - -void BucketMoveJobV2::updatePending() { _bucketsPending.store(_bucketsInFlight.size() + _buckets2Move.size(), std::memory_order_relaxed); } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index df75c8c9766..7d5dafb33b3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -72,7 +72,6 @@ private: Bucket2Mover _bucketsInFlight; BucketMoveSet _buckets2Move; - std::atomic<bool> _stopped; std::atomic<size_t> _bucketsPending; bucketdb::IBucketCreateNotifier &_bucketCreateNotifier; @@ -142,7 +141,6 @@ public: void notifyBucketStateChanged(const BucketId &bucketId, ActiveState newState) override; void notifyDiskMemUsage(DiskMemUsageState state) override; void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; - void onStop() override; void updateMetrics(DocumentDBTaggedMetrics & metrics) const override; }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h index 6bea9855c82..7148576b76f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h @@ -4,6 +4,7 @@ #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/util/time.h> #include <memory> +#include <atomic> namespace proton { @@ -21,7 +22,9 @@ private: const vespalib::string _name; const vespalib::duration _delay; const vespalib::duration _interval; - + std::atomic<bool> _stopped; +protected: + virtual void onStop() = 0; public: using UP = std::unique_ptr<IMaintenanceJob>; using SP = std::shared_ptr<IMaintenanceJob>; @@ -31,7 +34,8 @@ public: vespalib::duration interval) : _name(name), _delay(delay), - _interval(interval) + _interval(interval), + _stopped(false) {} virtual ~IMaintenanceJob() = default; @@ -41,9 +45,12 @@ public: virtual vespalib::duration getInterval() const { return _interval; } virtual bool isBlocked() const { return false; } virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; } - virtual void onStop() = 0; virtual void updateMetrics(DocumentDBTaggedMetrics &) const {} - + void stop() { + _stopped = true; + onStop(); + } + bool stopped() const { return _stopped.load(std::memory_order_relaxed); } /** * Register maintenance job runner, in case event passed to the * job causes it to want to be run again. diff --git a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h index 0e1b2b00ce5..ecc592a00a4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/job_tracked_maintenance_job.h @@ -9,7 +9,7 @@ namespace proton { /** * Class for tracking the start and end of a maintenance job. */ -class JobTrackedMaintenanceJob : public IMaintenanceJob +class JobTrackedMaintenanceJob final : public IMaintenanceJob { private: IJobTracker::SP _tracker; @@ -26,7 +26,7 @@ public: _job->registerRunner(runner); } bool run() override; - void onStop() override { _job->onStop(); } + void onStop() override { _job->stop(); } }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp index f2333f2f2b7..89a2e226046 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.cpp @@ -59,7 +59,7 @@ public: void fail(const Bucket & bucket) override { assert(bucket.getBucketId() == _meta.bucketId); auto & master = _job->_master; - if (_job->_stopped) return; + if (_job->stopped()) return; master.execute(makeLambdaTask([job=std::move(_job)] { job->_scanItr.reset(); })); } private: @@ -88,7 +88,7 @@ void CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::DocumentMetaData & metaThen, std::shared_ptr<IDestructorCallback> context) { - if (job->_stopped) return; //TODO Remove once lidtracker is no longer in use. + if (job->stopped()) return; //TODO Remove once lidtracker is no longer in use. // The real lid must be sampled in the master thread. //TODO remove target lid from createMoveOperation interface auto op = job->_handler->createMoveOperation(metaThen, 0); @@ -97,9 +97,9 @@ CompactionJob::moveDocument(std::shared_ptr<CompactionJob> job, const search::Do if (metaThen.gid != op->getDocument()->getId().getGlobalId()) return; auto & master = job->_master; - if (job->_stopped) return; + if (job->stopped()) return; master.execute(makeLambdaTask([self=std::move(job), meta=metaThen, moveOp=std::move(op), onDone=std::move(context)]() mutable { - if (self->_stopped.load(std::memory_order_relaxed)) return; + if (self->stopped()) return; self->completeMove(meta, std::move(moveOp), std::move(onDone)); })); } @@ -151,8 +151,7 @@ CompactionJob::CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, _master(master), _bucketExecutor(bucketExecutor), _dbRetainer(std::move(dbRetainer)), - _bucketSpace(bucketSpace), - _stopped(false) + _bucketSpace(bucketSpace) { _diskMemUsageNotifier.addDiskMemUsageListener(this); _clusterStateChangedNotifier.addClusterStateChangedHandler(this); @@ -189,12 +188,6 @@ CompactionJob::create(const DocumentDBLidSpaceCompactionConfig &config, }); } -void -CompactionJob::onStop() { - BlockableMaintenanceJob::onStop(); - _stopped = true; -} - DocumentMetaData CompactionJob::getNextDocument(const LidUsageStats &stats, bool retryLastDocument) { diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h index 141f29f4510..725c7387bdc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job_take2.h @@ -52,7 +52,6 @@ private: BucketExecutor &_bucketExecutor; RetainGuard _dbRetainer; document::BucketSpace _bucketSpace; - std::atomic<bool> _stopped; bool hasTooMuchLidBloat(const search::LidUsageStats &stats) const; bool shouldRestartScanDocuments(const search::LidUsageStats &stats) const; @@ -66,7 +65,6 @@ private: std::shared_ptr<IDestructorCallback> onDone); void completeMove(const search::DocumentMetaData & metaThen, std::unique_ptr<MoveOperation> moveOp, std::shared_ptr<IDestructorCallback> onDone); - void onStop() override; class MoveTask; CompactionJob(const DocumentDBLidSpaceCompactionConfig &config, diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp index 658fa9f7482..d508c3193fc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancejobrunner.cpp @@ -25,7 +25,7 @@ MaintenanceJobRunner::stop() { Guard guard(_lock); _stopped = true; } - _job->onStop(); + _job->stop(); } void diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp index f2cb6f3c270..0c5b165611e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.cpp @@ -36,7 +36,6 @@ PruneRemovedDocumentsJobV2(const DocumentDBPruneConfig &config, RetainGuard dbRe _cfgAgeLimit(config.getAge()), _subDbId(subDbId), _bucketSpace(bucketSpace), - _stopped(false), _nextLid(1u) { } @@ -75,7 +74,7 @@ PruneRemovedDocumentsJobV2::PruneTask::run(const Bucket & bucket, IDestructorCal void PruneRemovedDocumentsJobV2::remove(uint32_t lid, const RawDocumentMetaData & oldMeta) { - if (_stopped.load(std::memory_order_relaxed)) return; + if (stopped()) return; if ( ! _metaStore.validLid(lid)) return; const RawDocumentMetaData &meta = _metaStore.getRawMetaData(lid); if (meta.getBucketId() != oldMeta.getBucketId()) return; diff --git a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h index 98330c75d5c..b0eefedf8e2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h +++ b/searchcore/src/vespa/searchcore/proton/server/pruneremoveddocumentsjob_v2.h @@ -40,7 +40,6 @@ private: const vespalib::duration _cfgAgeLimit; const uint32_t _subDbId; const document::BucketSpace _bucketSpace; - std::atomic<bool> _stopped; DocId _nextLid; @@ -51,7 +50,6 @@ private: IPruneRemovedDocumentsHandler &handler, IThreadService & master, BucketExecutor & bucketExecutor); bool run() override; - void onStop() override { _stopped = true; } public: static std::shared_ptr<PruneRemovedDocumentsJobV2> create(const Config &config, RetainGuard dbRetainer, const IDocumentMetaStore &metaStore, uint32_t subDbId, |