diff options
author | jonmv <venstad@gmail.com> | 2022-07-04 10:54:16 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2022-07-05 14:55:28 +0200 |
commit | 0d665219a01bbfaa1c5ffc7166912612b168d992 (patch) | |
tree | d6c6398bb138f36994991e3fb233c9d40cc5ee8c /controller-server/src/main/java/com | |
parent | b30efedee51a3b19230879d33787ede7fe438187 (diff) |
Hold run locks for more operations
Diffstat (limited to 'controller-server/src/main/java/com')
2 files changed, 32 insertions, 28 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index 5113d386b23..19b2afb3af9 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -408,11 +408,6 @@ public class JobController { locked(id, run -> run.with(status, step)); } - /** Invoked when starting the step */ - public void setStartTimestamp(RunId id, Instant timestamp, LockedStep step) { - locked(id, run -> run.with(timestamp, step)); - } - /** * Changes the status of the given run to inactive, and stores it as a historic run. * Throws TimeoutException if some step in this job is still being run. @@ -774,7 +769,8 @@ public class JobController { public void locked(RunId id, UnaryOperator<Run> modifications) { try (Mutex __ = curator.lock(id.application(), id.type())) { active(id).ifPresent(run -> { - curator.writeLastRun(modifications.apply(run)); + Run modified = modifications.apply(run); + if (modified != null) curator.writeLastRun(modified); }); } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java index 94ec4129744..c204f42e261 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java @@ -8,7 +8,6 @@ import com.yahoo.vespa.hosted.controller.deployment.InternalStepRunner; import com.yahoo.vespa.hosted.controller.deployment.JobController; import com.yahoo.vespa.hosted.controller.deployment.Run; import com.yahoo.vespa.hosted.controller.deployment.Step; -import com.yahoo.vespa.hosted.controller.deployment.StepInfo; import com.yahoo.vespa.hosted.controller.deployment.StepRunner; import java.time.Duration; @@ -75,18 +74,26 @@ public class JobRunner extends ControllerMaintainer { } } - /** Advances each of the ready steps for the given run, or marks it as finished, and stashes it. Public for testing. */ public void advance(Run run) { - if ( ! run.hasFailed() - && controller().clock().instant().isAfter(run.sleepUntil().orElse(run.start()).plus(jobTimeout))) - executors.execute(() -> { - jobs.abort(run.id(), "job timeout of " + jobTimeout + " reached"); - advance(jobs.run(run.id())); - }); - else if (run.readySteps().isEmpty()) - executors.execute(() -> finish(run.id())); - else if (run.hasFailed() || run.sleepUntil().map(sleepUntil -> ! sleepUntil.isAfter(controller().clock().instant())).orElse(true)) - run.readySteps().forEach(step -> executors.execute(() -> advance(run.id(), step))); + advance(run.id()); + } + + /** Advances each of the ready steps for the given run, or marks it as finished, and stashes it. Public for testing. */ + public void advance(RunId id) { + jobs.locked(id, run -> { + if ( ! run.hasFailed() + && controller().clock().instant().isAfter(run.sleepUntil().orElse(run.start()).plus(jobTimeout))) + executors.execute(() -> { + jobs.abort(run.id(), "job timeout of " + jobTimeout + " reached"); + advance(run.id()); + }); + else if (run.readySteps().isEmpty()) + executors.execute(() -> finish(run.id())); + else if (run.hasFailed() || run.sleepUntil().map(sleepUntil -> ! sleepUntil.isAfter(controller().clock().instant())).orElse(true)) + run.readySteps().forEach(step -> executors.execute(() -> advance(run.id(), step))); + + return run; + }); } private void finish(RunId id) { @@ -108,23 +115,24 @@ public class JobRunner extends ControllerMaintainer { try { AtomicBoolean changed = new AtomicBoolean(false); jobs.locked(id.application(), id.type(), step, lockedStep -> { - jobs.locked(id, run -> run); // Memory visibility. - jobs.active(id).ifPresent(run -> { // The run may have become inactive, so we bail out. + jobs.locked(id, run -> { if ( ! run.readySteps().contains(step)) { changed.set(true); - return; // Someone may have updated the run status, making this step obsolete, so we bail out. + return run; // Someone may have updated the run status, making this step obsolete, so we bail out. } - StepInfo stepInfo = run.stepInfo(lockedStep.get()).orElseThrow(); - if (stepInfo.startTime().isEmpty()) { - jobs.setStartTimestamp(run.id(), controller().clock().instant(), lockedStep); - } + if (run.stepInfo(lockedStep.get()).orElseThrow().startTime().isEmpty()) + run = run.with(controller().clock().instant(), lockedStep); - runner.run(lockedStep, run.id()).ifPresent(status -> { - jobs.update(run.id(), status, lockedStep); + return run; + }); + + if ( ! changed.get()) { + runner.run(lockedStep, id).ifPresent(status -> { + jobs.update(id, status, lockedStep); changed.set(true); }); - }); + } }); if (changed.get()) jobs.active(id).ifPresent(this::advance); |