diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-28 14:02:02 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-28 14:02:02 +0100 |
commit | dd9dba2eee8f26b8d8823a3c88697113bc125e83 (patch) | |
tree | 92ad0633783f077554767b27c268d642e9912f3d /controller-server | |
parent | 91c600222b9ccde4e7113da2171b92067eeae091 (diff) |
Hold all step locks while finishing run
Diffstat (limited to 'controller-server')
-rw-r--r-- | controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java | 77 |
1 files changed, 44 insertions, 33 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index e288127ca6d..d88844f1cc6 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -41,8 +41,10 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -353,39 +355,50 @@ public class JobController { locked(id, run -> run.with(timestamp, step)); } - /** Changes the status of the given run to inactive, and stores it as a historic run. */ + /** + * Changes the status of the given run to inactive, and stores it as a historic run. + * Throws TimeoutException if some step in this job is still being run. + */ public void finish(RunId id) throws TimeoutException { - // Ensure no step is still running before we finish the run — report depends transitively on all the other steps. - locked(id.application(), id.type(), report, __ -> { }); - - locked(id, run -> { // Store the modified run after it has been written to history, in case the latter fails. - Run finishedRun = run.finished(controller.clock().instant()); - locked(id.application(), id.type(), runs -> { - runs.put(run.id(), finishedRun); - long last = id.number(); - long successes = runs.values().stream().filter(old -> old.status() == RunStatus.success).count(); - var oldEntries = runs.entrySet().iterator(); - for (var old = oldEntries.next(); - old.getKey().number() <= last - historyLength - || old.getValue().start().isBefore(controller.clock().instant().minus(maxHistoryAge)); - old = oldEntries.next()) { - - // Make sure we keep the last success and the first failing - if ( successes == 1 - && old.getValue().status() == RunStatus.success - && ! old.getValue().start().isBefore(controller.clock().instant().minus(maxHistoryAge))) { - oldEntries.next(); - continue; + List<Lock> locks = new ArrayList<>(); + try { + // Ensure no step is still running before we finish the run — report depends transitively on all the other steps. + for (Step step : report.allPrerequisites()) + locks.add(curator.lock(id.application(), id.type(), step)); + + locked(id, run -> { // Store the modified run after it has been written to history, in case the latter fails. + Run finishedRun = run.finished(controller.clock().instant()); + locked(id.application(), id.type(), runs -> { + runs.put(run.id(), finishedRun); + long last = id.number(); + long successes = runs.values().stream().filter(old -> old.status() == RunStatus.success).count(); + var oldEntries = runs.entrySet().iterator(); + for (var old = oldEntries.next(); + old.getKey().number() <= last - historyLength + || old.getValue().start().isBefore(controller.clock().instant().minus(maxHistoryAge)); + old = oldEntries.next()) { + + // Make sure we keep the last success and the first failing + if (successes == 1 + && old.getValue().status() == RunStatus.success + && !old.getValue().start().isBefore(controller.clock().instant().minus(maxHistoryAge))) { + oldEntries.next(); + continue; + } + + logs.delete(old.getKey()); + oldEntries.remove(); } - - logs.delete(old.getKey()); - oldEntries.remove(); - } + }); + logs.flush(id); + metric.jobFinished(run.id().job(), finishedRun.status()); + return finishedRun; }); - logs.flush(id); - metric.jobFinished(run.id().job(), finishedRun.status()); - return finishedRun; - }); + } + finally { + for (Lock lock : locks) + lock.close(); + } } /** Marks the given run as aborted; no further normal steps will run, but run-always steps will try to succeed. */ @@ -393,9 +406,7 @@ public class JobController { locked(id, run -> run.aborted()); } - /** - * Accepts and stores a new application package and test jar pair under a generated application version key. - */ + /** Accepts and stores a new application package and test jar pair under a generated application version key. */ public ApplicationVersion submit(TenantAndApplicationId id, Optional<SourceRevision> revision, Optional<String> authorEmail, Optional<String> sourceUrl, long projectId, ApplicationPackage applicationPackage, byte[] testPackageBytes) { |