diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-04-15 10:08:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-15 10:08:40 +0200 |
commit | 81fad70d16a8494ce0464af6ee4ba9c0e12f6a6e (patch) | |
tree | 3a7f33a5351dc8ef2c06bb1a655c8d02bfa98ee4 /searchcore | |
parent | 9a2f07f51c00c011b66d753f8b545f3f77f5279b (diff) | |
parent | 3052b47468ad235413d7520302902808e5040c9f (diff) |
Merge pull request #17437 from vespa-engine/balder/ensure-all-jobs-are-drained-from-non-master-executor
If the job is posted on non-master-executor, we must also wait for th…
Diffstat (limited to 'searchcore')
5 files changed, 12 insertions, 6 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h index f20511bc417..fee0f91e717 100644 --- a/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/blockable_maintenance_job.h @@ -54,7 +54,7 @@ public: bool isBlocked() const override; void registerRunner(IMaintenanceJobRunner *runner) override { _runner = runner; } IMoveOperationLimiter & getLimiter() { return *_moveOpsLimiter; } - + const IMoveOperationLimiter & getLimiter() const { return *_moveOpsLimiter; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp index db251a02796..ef536639635 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.cpp @@ -434,7 +434,7 @@ BucketMoveJobV2::updatePending() { } void -BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) { +BucketMoveJobV2::updateMetrics(DocumentDBTaggedMetrics & metrics) const { // This is an over estimate to ensure we do not count down to zero until everything has been and completed and acked. metrics.bucketMove.bucketsPending.set(_bucketsPending.load(std::memory_order_relaxed) + getLimiter().numPending()); diff --git a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h index 6a9f28f0a01..40cbc98adf7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h +++ b/searchcore/src/vespa/searchcore/proton/server/bucketmovejobv2.h @@ -146,7 +146,7 @@ public: void notifyDiskMemUsage(DiskMemUsageState state) override; void notifyCreateBucket(const bucketdb::Guard & guard, const BucketId &bucket) override; void onStop() override; - void updateMetrics(DocumentDBTaggedMetrics & metrics) override; + void updateMetrics(DocumentDBTaggedMetrics & metrics) const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h index 3972072b41d..6bea9855c82 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_maintenance_job.h @@ -42,7 +42,7 @@ public: virtual bool isBlocked() const { return false; } virtual IBlockableMaintenanceJob *asBlockable() { return nullptr; } virtual void onStop() = 0; - virtual void updateMetrics(DocumentDBTaggedMetrics &) {} + virtual void updateMetrics(DocumentDBTaggedMetrics &) const {} /** * Register maintenance job runner, in case event passed to the diff --git a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp index c6eec3b7791..f981ebb1116 100644 --- a/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/maintenancecontroller.cpp @@ -29,6 +29,13 @@ public: void run() override { _job->run(); } }; +bool +isRunningOrRunnable(const MaintenanceJobRunner & job, const Executor * master) { + return (&job.getExecutor() == master) + ? job.isRunning() + : job.isRunnable(); +} + } MaintenanceController::MaintenanceController(IThreadService &masterThread, @@ -79,7 +86,6 @@ MaintenanceController::registerJob(Executor & executor, IMaintenanceJob::UP job) _jobs.push_back(std::make_shared<MaintenanceJobRunner>(executor, std::move(job))); } - void MaintenanceController::killJobs() { @@ -95,7 +101,7 @@ MaintenanceController::killJobs() job->stop(); // Make sure no more tasks are added to the executor } for (auto &job : _jobs) { - while (job->isRunning()) { + while (isRunningOrRunnable(*job, &_masterThread)) { std::this_thread::sleep_for(1ms); } } |