From dceb2dbc3eed426a4a041b0e6bdef41ed9389620 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 7 Nov 2019 14:30:45 +0100 Subject: Add JobStatus aggregate of run list, and use it in DeploymentTrigger, JobCAHH --- .../controller/deployment/DeploymentTrigger.java | 201 +++++++++------------ .../controller/deployment/JobController.java | 33 ++-- .../hosted/controller/deployment/JobStatus.java | 107 +++++++++++ .../vespa/hosted/controller/deployment/Run.java | 1 - .../hosted/controller/deployment/RunStatus.java | 4 +- .../application/JobControllerApiHandlerHelper.java | 51 +++--- 6 files changed, 247 insertions(+), 150 deletions(-) create mode 100644 controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobStatus.java (limited to 'controller-server/src') diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java index ca2db96c14d..45bf8a43f07 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java @@ -13,14 +13,13 @@ import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.Instance; import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import com.yahoo.vespa.hosted.controller.api.integration.BuildService; -import com.yahoo.vespa.hosted.controller.api.integration.BuildService.JobState; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationVersion; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; import com.yahoo.vespa.hosted.controller.application.ApplicationList; import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.Deployment; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobReport; -import com.yahoo.vespa.hosted.controller.application.JobStatus; import com.yahoo.vespa.hosted.controller.application.JobStatus.JobRun; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; @@ -30,7 +29,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -44,9 +43,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static com.yahoo.vespa.hosted.controller.api.integration.BuildService.BuildJob; -import static com.yahoo.vespa.hosted.controller.api.integration.BuildService.JobState.idle; -import static com.yahoo.vespa.hosted.controller.api.integration.BuildService.JobState.queued; -import static com.yahoo.vespa.hosted.controller.api.integration.BuildService.JobState.running; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType.stagingTest; import static com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType.systemTest; import static java.util.Collections.emptyList; @@ -139,18 +135,17 @@ public class DeploymentTrigger { } applications().lockApplicationOrThrow(TenantAndApplicationId.from(report.applicationId()), application -> { - JobRun triggering; - Optional status = application.get().require(report.applicationId().instance()) - .deploymentJobs().statusOf(report.jobType()); - triggering = status.filter(job -> job.lastTriggered().isPresent() - && job.lastCompleted() - .map(completion -> ! completion.at().isAfter(job.lastTriggered().get().at())) - .orElse(true)) - .orElseThrow(() -> new IllegalStateException("Notified of completion of " + report.jobType().jobName() + " for " + - report.applicationId() + ", but that has not been triggered; last was " + - status.flatMap(job -> job.lastTriggered().map(run -> run.at().toString())) - .orElse("never"))) - .lastTriggered().get(); + var status = application.get().require(report.applicationId().instance()) + .deploymentJobs().statusOf(report.jobType()); + var triggering = status.filter(job -> job.lastTriggered().isPresent() + && job.lastCompleted() + .map(completion -> ! completion.at().isAfter(job.lastTriggered().get().at())) + .orElse(true)) + .orElseThrow(() -> new IllegalStateException("Notified of completion of " + report.jobType().jobName() + " for " + + report.applicationId() + ", but that has not been triggered; last was " + + status.flatMap(job -> job.lastTriggered().map(run -> run.at().toString())) + .orElse("never"))) + .lastTriggered().get(); application = application.with(report.applicationId().instance(), instance -> instance.withJobCompletion(report.jobType(), @@ -231,9 +226,10 @@ public class DeploymentTrigger { Versions versions = Versions.from(application.change(), application, deploymentFor(instance, jobType), controller.systemVersion()); String reason = "Job triggered manually by " + user; - return (jobType.isProduction() && ! isTested(instance, versions) - ? testJobs(application.deploymentSpec(), application.change(), instance, versions, reason, clock.instant(), __ -> true).stream() - : Stream.of(deploymentJob(instance, versions, application.change(), jobType, reason, clock.instant()))) + var jobStatus = jobs.jobStatus(applicationId, application.deploymentSpec()); + return (jobType.isProduction() && ! isTested(jobStatus, versions) + ? testJobs(application.deploymentSpec(), application.change(), instance, jobStatus, versions, reason, clock.instant(), __ -> true).stream() + : Stream.of(deploymentJob(instance, versions, application.change(), jobType, jobStatus.get(jobType), reason, clock.instant()))) .peek(this::trigger) .map(Job::jobType).collect(toList()); } @@ -289,9 +285,8 @@ public class DeploymentTrigger { return controller.applications(); } - private Optional successOn(Instance instance, JobType jobType, Versions versions) { - return instance.deploymentJobs().statusOf(jobType).flatMap(JobStatus::lastSuccess) - .filter(versions::targetsMatch); + private Optional successOn(JobStatus status, Versions versions) { + return status.lastSuccess().filter(run -> versions.targetsMatch(run.versions())); } private Optional deploymentFor(Instance instance, JobType jobType) { @@ -326,11 +321,12 @@ public class DeploymentTrigger { .flatMap(instance -> application.get(instance.name()).stream()) .collect(Collectors.toUnmodifiableList()); for (Instance instance : instances) { + var jobStatus = this.jobs.jobStatus(instance.id(), application.deploymentSpec()); Change change = application.change(); - Optional completedAt = max(instance.deploymentJobs().statusOf(systemTest) - .flatMap(job -> job.lastSuccess().map(JobRun::at)), - instance.deploymentJobs().statusOf(stagingTest) - .flatMap(job -> job.lastSuccess().map(JobRun::at))); + Optional completedAt = max(Optional.ofNullable(jobStatus.get(systemTest)) + .flatMap(job -> job.lastSuccess().map(run -> run.end().get())), + Optional.ofNullable(jobStatus.get(stagingTest)) + .flatMap(job -> job.lastSuccess().map(run -> run.end().get()))); String reason = "New change available"; List testJobs = null; // null means "uninitialised", while empty means "don't run any jobs". DeploymentSteps steps = steps(application.deploymentSpec().requireInstance(instance.name())); @@ -338,22 +334,22 @@ public class DeploymentTrigger { if (change.hasTargets()) { for (Step step : steps.production()) { List stepJobs = steps.toJobs(step); - List remainingJobs = stepJobs.stream().filter(job -> ! isComplete(change, change, instance, job)).collect(toList()); + List remainingJobs = stepJobs.stream().filter(job -> ! isComplete(change, change, instance, job, jobStatus.get(job))).collect(toList()); if ( ! remainingJobs.isEmpty()) { // Change is incomplete; trigger remaining jobs if ready, or their test jobs if untested. for (JobType job : remainingJobs) { Versions versions = Versions.from(change, application, deploymentFor(instance, job), controller.systemVersion()); - if (isTested(instance, versions)) { - if (completedAt.isPresent() && canTrigger(job, versions, instance, application.deploymentSpec(), stepJobs)) { - jobs.add(deploymentJob(instance, versions, change, job, reason, completedAt.get())); + if (isTested(jobStatus, versions)) { + if (completedAt.isPresent() && canTrigger(job, jobStatus, versions, instance, application.deploymentSpec(), stepJobs)) { + jobs.add(deploymentJob(instance, versions, change, job, jobStatus.get(job), reason, completedAt.get())); } - if ( ! alreadyTriggered(instance, versions) && testJobs == null) { + if ( ! alreadyTriggered(jobStatus, versions) && testJobs == null) { testJobs = emptyList(); } } else if (testJobs == null) { testJobs = testJobs(application.deploymentSpec(), - change, instance, versions, + change, instance, jobStatus, versions, String.format("Testing deployment for %s (%s)", job.jobName(), versions.toString()), completedAt.orElseGet(clock::instant)); @@ -367,14 +363,14 @@ public class DeploymentTrigger { reason += " after a delay of " + step.delay(); } else { - completedAt = stepJobs.stream().map(job -> instance.deploymentJobs().statusOf(job).get().lastCompleted().get().at()).max(naturalOrder()); + completedAt = stepJobs.stream().map(job -> jobStatus.get(job).lastCompleted().get().end().get()).max(naturalOrder()); reason = "Available change in " + stepJobs.stream().map(JobType::jobName).collect(joining(", ")); } } } } if (testJobs == null) { // If nothing to test, but outstanding commits, test those. - testJobs = testJobs(application.deploymentSpec(), change, instance, + testJobs = testJobs(application.deploymentSpec(), change, instance, jobStatus, Versions.from(application.outstandingChange().onTopOf(change), application, steps.sortedDeployments(instance.productionDeployments().values()).stream().findFirst(), @@ -388,21 +384,21 @@ public class DeploymentTrigger { } /** Returns whether given job should be triggered */ - private boolean canTrigger(JobType job, Versions versions, Instance instance, DeploymentSpec deploymentSpec, List parallelJobs) { - if (jobStateOf(instance, job) != idle) return false; + private boolean canTrigger(JobType job, Map status, Versions versions, Instance instance, DeploymentSpec deploymentSpec, List parallelJobs) { + if (status.get(job).isRunning()) return false; // Are we already running jobs which are not in the set which can run in parallel with this? - if (parallelJobs != null && ! parallelJobs.containsAll(runningProductionJobs(instance))) return false; + if (parallelJobs != null && ! parallelJobs.containsAll(runningProductionJobs(status))) return false; // Are there another suspended deployment such that we shouldn't simultaneously change this? if (job.isProduction() && isSuspendedInAnotherZone(instance, job.zone(controller.system()))) return false; - return triggerAt(clock.instant(), job, versions, instance, deploymentSpec); + return triggerAt(clock.instant(), job, status.get(job), versions, instance, deploymentSpec); } /** Returns whether given job should be triggered */ - private boolean canTrigger(JobType job, Versions versions, Instance instance, DeploymentSpec deploymentSpec) { - return canTrigger(job, versions, instance, deploymentSpec, null); + private boolean canTrigger(JobType job, Map status, Versions versions, Instance instance, DeploymentSpec deploymentSpec) { + return canTrigger(job, status, versions, instance, deploymentSpec, null); } private boolean isSuspendedInAnotherZone(Instance instance, ZoneId zone) { @@ -415,24 +411,23 @@ public class DeploymentTrigger { } /** Returns whether the given job can trigger at the given instant */ - public boolean triggerAt(Instant instant, JobType job, Versions versions, Instance instance, DeploymentSpec deploymentSpec) { - Optional jobStatus = instance.deploymentJobs().statusOf(job); - if (jobStatus.isEmpty()) return true; - if (jobStatus.get().pausedUntil().isPresent() && jobStatus.get().pausedUntil().getAsLong() > clock.instant().toEpochMilli()) return false; - if (jobStatus.get().isSuccess()) return true; // Success - if (jobStatus.get().lastCompleted().isEmpty()) return true; // Never completed - if (jobStatus.get().firstFailing().isEmpty()) return true; // Should not happen as firstFailing should be set for an unsuccessful job - if ( ! versions.targetsMatch(jobStatus.get().lastCompleted().get())) return true; // Always trigger as targets have changed + public boolean triggerAt(Instant instant, JobType job, JobStatus jobStatus, Versions versions, Instance instance, DeploymentSpec deploymentSpec) { + if (instance.deploymentJobs().statusOf(job).map(status -> status.pausedUntil().orElse(0)).orElse(0L) > clock.millis()) return false; + if (jobStatus.lastTriggered().isEmpty()) return true; + if (jobStatus.isSuccess()) return true; // Success + if (jobStatus.lastCompleted().isEmpty()) return true; // Never completed + if (jobStatus.firstFailing().isEmpty()) return true; // Should not happen as firstFailing should be set for an unsuccessful job + if ( ! versions.targetsMatch(jobStatus.lastCompleted().get().versions())) return true; // Always trigger as targets have changed if (deploymentSpec.requireInstance(instance.name()).upgradePolicy() == DeploymentSpec.UpgradePolicy.canary) return true; // Don't throttle canaries - Instant firstFailing = jobStatus.get().firstFailing().get().at(); - Instant lastCompleted = jobStatus.get().lastCompleted().get().at(); + Instant firstFailing = jobStatus.firstFailing().get().end().get(); + Instant lastCompleted = jobStatus.lastCompleted().get().end().get(); // Retry all errors immediately for 1 minute if (firstFailing.isAfter(instant.minus(Duration.ofMinutes(1)))) return true; // Retry out of capacity errors in test environments every minute - if (job.isTest() && jobStatus.get().isOutOfCapacity()) { + if (job.isTest() && jobStatus.isOutOfCapacity()) { return lastCompleted.isBefore(instant.minus(Duration.ofMinutes(1))); } @@ -445,27 +440,12 @@ public class DeploymentTrigger { // ---------- Job state helpers ---------- - private List runningProductionJobs(Instance instance) { - return instance.deploymentJobs().jobStatus().keySet().parallelStream() - .filter(JobType::isProduction) - .filter(job -> isRunning(instance, job)) - .collect(toList()); - } - - /** Returns whether the given job is currently running; false if completed since last triggered, asking the build service otherwise. */ - private boolean isRunning(Instance instance, JobType jobType) { - return ! instance.deploymentJobs().statusOf(jobType) - .flatMap(job -> job.lastCompleted().map(run -> run.at().isAfter(job.lastTriggered().get().at()))) - .orElse(false) - && EnumSet.of(running, queued).contains(jobStateOf(instance, jobType)); - } - - private JobState jobStateOf(Instance instance, JobType jobType) { - if (controller.applications().requireApplication(TenantAndApplicationId.from(instance.id())).internal()) { - Optional run = controller.jobController().last(instance.id(), jobType); - return run.isPresent() && ! run.get().hasEnded() ? JobState.running : JobState.idle; - } - return buildService.stateOf(BuildJob.of(instance.id(), 0, jobType.jobName())); + private List runningProductionJobs(Map status) { + return status.values().parallelStream() + .filter(job -> job.isRunning()) + .map(job -> job.job().type()) + .filter(JobType::isProduction) + .collect(toList()); } // ---------- Completion logic ---------- @@ -482,17 +462,18 @@ public class DeploymentTrigger { * Additionally, if the application is pinned to a Vespa version, and the given change has a (this) platform, * the deployment for the job must be on the pinned version. */ - public boolean isComplete(Change change, Change fullChange, Instance instance, JobType jobType) { + public boolean isComplete(Change change, Change fullChange, Instance instance, JobType jobType, + JobStatus status) { Optional existingDeployment = deploymentFor(instance, jobType); if ( change.isPinned() && change.platform().isPresent() && ! existingDeployment.map(Deployment::version).equals(change.platform())) return false; - return instance.deploymentJobs().statusOf(jobType).flatMap(JobStatus::lastSuccess) - .map(job -> change.platform().map(job.platform()::equals).orElse(true) - && change.application().map(job.application()::equals).orElse(true)) - .orElse(false) + return status.lastSuccess() + .map(run -> change.platform().map(run.versions().targetPlatform()::equals).orElse(true) + && change.application().map(run.versions().targetApplication()::equals).orElse(true)) + .orElse(false) || jobType.isProduction() && existingDeployment.map(deployment -> ! isUpgrade(change, deployment) && isDowngrade(fullChange, deployment)) .orElse(false); @@ -506,27 +487,28 @@ public class DeploymentTrigger { return change.downgrades(deployment.version()) || change.downgrades(deployment.applicationVersion()); } - private boolean isTested(Instance instance, Versions versions) { - return testedIn(instance, systemTest, versions) - && testedIn(instance, stagingTest, versions) - || alreadyTriggered(instance, versions); + private boolean isTested(Map status, Versions versions) { + return testedIn(systemTest, status.get(systemTest), versions) + && testedIn(stagingTest, status.get(stagingTest), versions) + || alreadyTriggered(status, versions); } - public boolean testedIn(Instance instance, JobType testType, Versions versions) { + public boolean testedIn(JobType testType, JobStatus status, Versions versions) { if (testType == systemTest) - return successOn(instance, systemTest, versions).isPresent(); + return successOn(status, versions).isPresent(); if (testType == stagingTest) - return successOn(instance, stagingTest, versions).filter(versions::sourcesMatchIfPresent).isPresent(); + return successOn(status, versions).map(Run::versions).filter(versions::sourcesMatchIfPresent).isPresent(); throw new IllegalArgumentException(testType + " is not a test job!"); } - public boolean alreadyTriggered(Instance instance, Versions versions) { - return instance.deploymentJobs().jobStatus().values().stream() - .filter(job -> job.type().isProduction()) + public boolean alreadyTriggered(Map status, Versions versions) { + return status.values().stream() + .filter(job -> job.job().type().isProduction()) .anyMatch(job -> job.lastTriggered() - .filter(versions::targetsMatch) - .filter(versions::sourcesMatchIfPresent) - .isPresent()); + .map(Run::versions) + .filter(versions::targetsMatch) + .filter(versions::sourcesMatchIfPresent) + .isPresent()); } // ---------- Change management o_O ---------- @@ -542,12 +524,11 @@ public class DeploymentTrigger { private Change remainingChange(Application application) { Change change = application.change(); - if (application.deploymentSpec().instances().stream() .allMatch(spec -> { DeploymentSteps steps = new DeploymentSteps(spec, controller::system); return (steps.productionJobs().isEmpty() ? steps.testJobs() : steps.productionJobs()) - .stream().allMatch(job -> isComplete(application.change().withoutApplication(), application.change(), application.require(spec.name()), job)); + .stream().allMatch(job -> isComplete(application.change().withoutApplication(), application.change(), application.require(spec.name()), job, jobs.jobStatus(new JobId(application.id().instance(spec.name()), job)))); })) change = change.withoutPlatform(); @@ -555,7 +536,7 @@ public class DeploymentTrigger { .allMatch(spec -> { DeploymentSteps steps = new DeploymentSteps(spec, controller::system); return (steps.productionJobs().isEmpty() ? steps.testJobs() : steps.productionJobs()) - .stream().allMatch(job -> isComplete(application.change().withoutPlatform(), application.change(), application.require(spec.name()), job)); + .stream().allMatch(job -> isComplete(application.change().withoutPlatform(), application.change(), application.require(spec.name()), job, jobs.jobStatus(new JobId(application.id().instance(spec.name()), job)))); })) change = change.withoutApplication(); @@ -567,42 +548,40 @@ public class DeploymentTrigger { /** * Returns the list of test jobs that should run now, and that need to succeed on the given versions for it to be considered tested. */ - private List testJobs(DeploymentSpec deploymentSpec, Change change, Instance instance, Versions versions, + private List testJobs(DeploymentSpec deploymentSpec, Change change, Instance instance, Map status, Versions versions, String reason, Instant availableSince) { - return testJobs(deploymentSpec, change, instance, versions, reason, availableSince, - jobType -> canTrigger(jobType, versions, instance, deploymentSpec)); + return testJobs(deploymentSpec, change, instance, status, versions, reason, availableSince, + jobType -> canTrigger(jobType, status, versions, instance, deploymentSpec)); } /** * Returns the list of test jobs that need to succeed on the given versions for it to be considered tested, filtered by the given condition. */ - private List testJobs(DeploymentSpec deploymentSpec, Change change, Instance instance, Versions versions, + private List testJobs(DeploymentSpec deploymentSpec, Change change, Instance instance, Map status, Versions versions, String reason, Instant availableSince, Predicate condition) { List jobs = new ArrayList<>(); for (JobType jobType : new DeploymentSteps(deploymentSpec.requireInstance(instance.name()), controller::system).testJobs()) { // TODO jonmv: Allow cross-instance validation - Optional completion = successOn(instance, jobType, versions) - .filter(run -> versions.sourcesMatchIfPresent(run) || jobType == systemTest); + Optional completion = successOn(status.get(jobType), versions) + .filter(run -> versions.sourcesMatchIfPresent(run.versions()) || jobType == systemTest); if (completion.isEmpty() && condition.test(jobType)) - jobs.add(deploymentJob(instance, versions, change, jobType, reason, availableSince)); + jobs.add(deploymentJob(instance, versions, change, jobType, status.get(jobType), reason, availableSince)); } return jobs; } - private Job deploymentJob(Instance instance, Versions versions, Change change, JobType jobType, String reason, Instant availableSince) { - boolean isRetry = instance.deploymentJobs().statusOf(jobType) - .map(JobStatus::isOutOfCapacity) - .orElse(false); - if (isRetry) reason += "; retrying on out of capacity"; + private Job deploymentJob(Instance instance, Versions versions, Change change, JobType jobType, JobStatus jobStatus, String reason, Instant availableSince) { + if (jobStatus.isOutOfCapacity()) reason += "; retrying on out of capacity"; - JobRun triggering = JobRun.triggering(versions.targetPlatform(), versions.targetApplication(), - versions.sourcePlatform(), versions.sourceApplication(), - reason, clock.instant()); - return new Job(instance, triggering, jobType, availableSince, isRetry, change.application().isPresent()); + var triggering = JobRun.triggering(versions.targetPlatform(), versions.targetApplication(), + versions.sourcePlatform(), versions.sourceApplication(), + reason, clock.instant()); + return new Job(instance, triggering, jobType, availableSince, jobStatus.isOutOfCapacity(), change.application().isPresent()); } // ---------- Data containers ---------- + // TODO jonmv: Replace with a JobSpec class not based on BuildJob. private static class Job extends BuildJob { private final JobType jobType; diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java index 41266e761a9..84f14109c4b 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobController.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.hosted.controller.deployment; import com.google.common.collect.ImmutableMap; import com.yahoo.component.Version; +import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.zone.ZoneId; @@ -33,8 +34,10 @@ import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -247,27 +250,17 @@ public class JobController { /** Returns the last completed of the given job. */ public Optional lastCompleted(JobId id) { - return Optional.ofNullable(curator.readHistoricRuns(id.application(), id.type()) - .lastEntry().getValue()); + return JobStatus.lastCompleted(runs(id)); } /** Returns the first failing of the given job. */ public Optional firstFailing(JobId id) { - Run failed = null; - loop: for (Run run : runs(id).descendingMap().values()) - switch (run.status()) { - case running: continue loop; - case success: break loop; - default: failed = run; - } - return Optional.ofNullable(failed); + return JobStatus.firstFailing(runs(id)); } /** Returns the last success of the given job. */ public Optional lastSuccess(JobId id) { - return runs(id).descendingMap().values().stream() - .filter(run -> run.status() == RunStatus.success) - .findFirst(); + return JobStatus.lastSuccess(runs(id)); } /** Returns the run with the given id, provided it is still active. */ @@ -294,6 +287,20 @@ public class JobController { .iterator()); } + /** Returns the job status of the given job, possibly empty. */ + public JobStatus jobStatus(JobId id) { + return new JobStatus(id, runs(id)); + } + + /** Returns the job status of all declared jobs for the given instance id, indexed by job type. */ + public Map jobStatus(ApplicationId id, DeploymentSpec spec) { + return new DeploymentSteps(spec.requireInstance(id.instance()), controller::system) + .jobs().stream() + .map(type -> jobStatus(new JobId(id, type))) + .collect(Collectors.toUnmodifiableMap(status -> status.job().type(), + status -> status)); + } + /** Changes the status of the given step, for the given run, provided it is still active. */ public void update(RunId id, RunStatus status, LockedStep step) { locked(id, run -> run.with(status, step)); diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobStatus.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobStatus.java new file mode 100644 index 00000000000..a338a766727 --- /dev/null +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/JobStatus.java @@ -0,0 +1,107 @@ +package com.yahoo.vespa.hosted.controller.deployment; + +import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; + +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; + +/** + * Aggregates information about all known runs of a given job to provide the high level status. + * + * @author jonmv + */ +public class JobStatus { + + private final JobId id; + private final NavigableMap runs; + private final Optional lastTriggered; + private final Optional lastCompleted; + private final Optional lastSuccess; + private final Optional firstFailing; + + public JobStatus(JobId id, NavigableMap runs) { + this.id = Objects.requireNonNull(id); + this.runs = Objects.requireNonNull(runs); + this.lastTriggered = runs.descendingMap().values().stream().findFirst(); + this.lastCompleted = lastCompleted(runs); + this.lastSuccess = lastSuccess(runs); + this.firstFailing = firstFailing(runs); + } + + public JobId job() { + return id; + } + + public NavigableMap runs() { + return runs; + } + + public Optional lastTriggered() { + return lastTriggered; + } + + public Optional lastCompleted() { + return lastCompleted; + } + + public Optional lastSuccess() { + return lastSuccess; + } + + public Optional firstFailing() { + return firstFailing; + } + + public Optional lastStatus() { + return lastCompleted().map(Run::status); + } + + public boolean isSuccess() { + return lastStatus().isPresent() && lastStatus().get() == RunStatus.success; + } + + public boolean isRunning() { + return lastTriggered.isPresent() && ! lastTriggered.get().hasEnded(); + } + + public boolean isOutOfCapacity() { + return lastStatus().isPresent() && lastStatus().get() == RunStatus.outOfCapacity; + } + + @Override + public String toString() { + return "JobStatus{" + + "id=" + id + + ", lastTriggered=" + lastTriggered + + ", lastCompleted=" + lastCompleted + + ", lastSuccess=" + lastSuccess + + ", firstFailing=" + firstFailing + + '}'; + } + + static Optional lastCompleted(NavigableMap runs) { + return runs.descendingMap().values().stream() + .filter(run -> run.hasEnded()) + .findFirst(); + } + + static Optional lastSuccess(NavigableMap runs) { + return runs.descendingMap().values().stream() + .filter(run -> run.status() == RunStatus.success) + .findFirst(); + } + + static Optional firstFailing(NavigableMap runs) { + Run failed = null; + loop: for (Run run : runs.descendingMap().values()) + switch (run.status()) { + case running: continue loop; + case success: break loop; + default: failed = run; + } + return Optional.ofNullable(failed); + } + +} diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/Run.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/Run.java index d7d6134ccb9..2f9c5ea9e08 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/Run.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/Run.java @@ -179,7 +179,6 @@ public class Run { ", start=" + start + ", end=" + end + ", status=" + status + - ", steps=" + steps + '}'; } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/RunStatus.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/RunStatus.java index ea6cc983b71..4d0b7ef3b90 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/RunStatus.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/RunStatus.java @@ -11,7 +11,7 @@ public enum RunStatus { /** Run is still proceeding normally, i.e., without failures. */ running, - /** Deployment was rejected due to missing capacity. */ + /** Deployment was rejected due to lack of capacity. */ outOfCapacity, /** Deployment of the real application was rejected. */ @@ -29,7 +29,7 @@ public enum RunStatus { /** Everything completed with great success! */ success, - /** Run has been abandoned, due to user intervention or timeout. */ + /** Run was abandoned, due to user intervention or job timeout. */ aborted } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/JobControllerApiHandlerHelper.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/JobControllerApiHandlerHelper.java index 9fb20ef4f81..cb4c1e71c43 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/JobControllerApiHandlerHelper.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/JobControllerApiHandlerHelper.java @@ -16,16 +16,17 @@ import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.NotExistsException; import com.yahoo.vespa.hosted.controller.api.integration.LogEntry; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationVersion; +import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.SourceRevision; import com.yahoo.vespa.hosted.controller.application.ApplicationPackage; import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.Deployment; -import com.yahoo.vespa.hosted.controller.application.JobStatus; import com.yahoo.vespa.hosted.controller.application.TenantAndApplicationId; import com.yahoo.vespa.hosted.controller.deployment.DeploymentSteps; import com.yahoo.vespa.hosted.controller.deployment.JobController; +import com.yahoo.vespa.hosted.controller.deployment.JobStatus; import com.yahoo.vespa.hosted.controller.deployment.Run; import com.yahoo.vespa.hosted.controller.deployment.RunLog; import com.yahoo.vespa.hosted.controller.deployment.RunStatus; @@ -39,6 +40,7 @@ import java.net.URI; import java.util.ArrayDeque; import java.util.Comparator; import java.util.Deque; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -77,11 +79,12 @@ class JobControllerApiHandlerHelper { Instance instance = application.require(id.instance()); Change change = application.change(); DeploymentSteps steps = new DeploymentSteps(application.deploymentSpec().requireInstance(id.instance()), controller::system); + Map status = controller.jobController().jobStatus(id, application.deploymentSpec()); // The logic for pending runs imitates DeploymentTrigger logic; not good, but the trigger wiring must be re-written to reuse :S Map pendingProduction = steps.productionJobs().stream() - .filter(type -> ! controller.applications().deploymentTrigger().isComplete(change, change, instance, type)) + .filter(type -> ! controller.applications().deploymentTrigger().isComplete(change, change, instance, type, status.get(type))) .collect(Collectors.toMap(type -> type, type -> Versions.from(change, application, @@ -102,8 +105,8 @@ class JobControllerApiHandlerHelper { Cursor lastVersionsObject = responseObject.setObject("lastVersions"); if (application.latestVersion().isPresent()) { - lastPlatformToSlime(lastVersionsObject.setObject("platform"), controller, application, instance, change, steps); - lastApplicationToSlime(lastVersionsObject.setObject("application"), application, instance, change, steps, controller); + lastPlatformToSlime(lastVersionsObject.setObject("platform"), controller, application, instance, status, change, steps); + lastApplicationToSlime(lastVersionsObject.setObject("application"), application, instance, status, change, steps, controller); } Cursor deployingObject = responseObject.setObject("deploying"); @@ -125,6 +128,7 @@ class JobControllerApiHandlerHelper { pendingProduction, running, type, + Optional.ofNullable(status.get(type)), deployment); }); }); @@ -135,6 +139,7 @@ class JobControllerApiHandlerHelper { controller, application, instance, + status, type, steps, pendingProduction, @@ -158,7 +163,7 @@ class JobControllerApiHandlerHelper { return new SlimeJsonResponse(slime); } - private static void lastPlatformToSlime(Cursor lastPlatformObject, Controller controller, Application application, Instance instance, Change change, DeploymentSteps steps) { + private static void lastPlatformToSlime(Cursor lastPlatformObject, Controller controller, Application application, Instance instance, Map status, Change change, DeploymentSteps steps) { VespaVersion lastVespa = controller.versionStatus().version(controller.systemVersion()); VespaVersion.Confidence targetConfidence = Map.of(defaultPolicy, normal, conservative, high) @@ -171,7 +176,7 @@ class JobControllerApiHandlerHelper { Version lastPlatform = lastVespa.versionNumber(); lastPlatformObject.setString("platform", lastPlatform.toString()); lastPlatformObject.setLong("at", lastVespa.committedAt().toEpochMilli()); - long completed = steps.productionJobs().stream().filter(type -> controller.applications().deploymentTrigger().isComplete(Change.of(lastPlatform), change, instance, type)).count(); + long completed = steps.productionJobs().stream().filter(type -> controller.applications().deploymentTrigger().isComplete(Change.of(lastPlatform), change, instance, type, status.get(type))).count(); if (Optional.of(lastPlatform).equals(change.platform())) lastPlatformObject.setString("deploying", completed + " of " + steps.productionJobs().size() + " complete"); else if (completed == steps.productionJobs().size()) @@ -191,12 +196,12 @@ class JobControllerApiHandlerHelper { : "Waiting for " + application.change() + " to complete"); } - private static void lastApplicationToSlime(Cursor lastApplicationObject, Application application, Instance instance, Change change, DeploymentSteps steps, Controller controller) { + private static void lastApplicationToSlime(Cursor lastApplicationObject, Application application, Instance instance, Map status, Change change, DeploymentSteps steps, Controller controller) { long completed; ApplicationVersion lastApplication = application.latestVersion().get(); applicationVersionToSlime(lastApplicationObject.setObject("application"), lastApplication); lastApplicationObject.setLong("at", lastApplication.buildTime().get().toEpochMilli()); - completed = steps.productionJobs().stream().filter(type -> controller.applications().deploymentTrigger().isComplete(Change.of(lastApplication), change, instance, type)).count(); + completed = steps.productionJobs().stream().filter(type -> controller.applications().deploymentTrigger().isComplete(Change.of(lastApplication), change, instance, type, status.get(type))).count(); if (Optional.of(lastApplication).equals(change.application())) lastApplicationObject.setString("deploying", completed + " of " + steps.productionJobs().size() + " complete"); else if (completed == steps.productionJobs().size()) @@ -215,14 +220,14 @@ class JobControllerApiHandlerHelper { private static void deploymentToSlime(Cursor deploymentObject, Instance instance, Change change, Map pendingProduction, Map running, - JobType type, Deployment deployment) { + JobType type, Optional jobStatus, Deployment deployment) { deploymentObject.setLong("at", deployment.at().toEpochMilli()); deploymentObject.setString("platform", deployment.version().toString()); applicationVersionToSlime(deploymentObject.setObject("application"), deployment.applicationVersion()); - deploymentObject.setBool("verified", instance.deploymentJobs().statusOf(type) - .flatMap(JobStatus::lastSuccess) - .filter(run -> run.platform().equals(deployment.version()) - && run.application().equals(deployment.applicationVersion())) + deploymentObject.setBool("verified", jobStatus.flatMap(job -> job.lastSuccess()) + .map(Run::versions) + .filter(run -> run.targetPlatform().equals(deployment.version()) + && run.targetApplication().equals(deployment.applicationVersion())) .isPresent()); if (running.containsKey(type)) deploymentObject.setString("status", running.get(type).steps().get(deployReal) == unfinished ? "deploying" : "verifying"); @@ -230,17 +235,17 @@ class JobControllerApiHandlerHelper { deploymentObject.setString("status", pendingProduction.containsKey(type) ? "pending" : "completed"); } - private static void jobTypeToSlime(Cursor jobObject, Controller controller, Application application, Instance instance, JobType type, DeploymentSteps steps, + private static void jobTypeToSlime(Cursor jobObject, Controller controller, Application application, Instance instance, Map status, JobType type, DeploymentSteps steps, Map pendingProduction, Map running, URI baseUriForJob) { - instance.deploymentJobs().statusOf(type).ifPresent(status -> status.pausedUntil().ifPresent(until -> + instance.deploymentJobs().statusOf(type).ifPresent(jobStatus -> jobStatus.pausedUntil().ifPresent(until -> jobObject.setLong("pausedUntil", until))); int runs = 0; Cursor runArray = jobObject.setArray("runs"); if (type.isTest()) { Deque> pending = new ArrayDeque<>(); pendingProduction.entrySet().stream() - .filter(typeVersions -> ! controller.applications().deploymentTrigger().testedIn(instance, type, typeVersions.getValue())) - .filter(typeVersions -> ! controller.applications().deploymentTrigger().alreadyTriggered(instance, typeVersions.getValue())) + .filter(typeVersions -> ! controller.applications().deploymentTrigger().testedIn(type, status.get(type), typeVersions.getValue())) + .filter(typeVersions -> ! controller.applications().deploymentTrigger().alreadyTriggered(status, typeVersions.getValue())) .collect(groupingBy(Map.Entry::getValue, LinkedHashMap::new, Collectors.mapping(Map.Entry::getKey, toList()))) @@ -254,7 +259,7 @@ class JobControllerApiHandlerHelper { Cursor runObject = runArray.addObject(); runObject.setString("status", "pending"); versionsToSlime(runObject, versions); - if ( ! controller.applications().deploymentTrigger().triggerAt(controller.clock().instant(), type, versions, instance, application.deploymentSpec())) + if ( ! controller.applications().deploymentTrigger().triggerAt(controller.clock().instant(), type, status.get(type), versions, instance, application.deploymentSpec())) runObject.setObject("tasks").setString("cooldown", "failed"); else runObject.setObject("tasks").setString("capacity", "running"); @@ -270,18 +275,18 @@ class JobControllerApiHandlerHelper { runObject.setString("status", "pending"); versionsToSlime(runObject, pendingProduction.get(type)); Cursor pendingObject = runObject.setObject("tasks"); - if (instance.deploymentJobs().statusOf(type).map(status -> status.pausedUntil().isPresent()).orElse(false)) + if (instance.deploymentJobs().statusOf(type).map(jobStatus -> jobStatus.pausedUntil().isPresent()).orElse(false)) pendingObject.setString("paused", "pending"); - else if ( ! controller.applications().deploymentTrigger().triggerAt(controller.clock().instant(), type, versions, instance, application.deploymentSpec())) + else if ( ! controller.applications().deploymentTrigger().triggerAt(controller.clock().instant(), type, status.get(type), versions, instance, application.deploymentSpec())) pendingObject.setString("cooldown", "failed"); else { int pending = 0; - if ( ! controller.applications().deploymentTrigger().alreadyTriggered(instance, versions)) { - if ( ! controller.applications().deploymentTrigger().testedIn(instance, systemTest, versions)) { + if ( ! controller.applications().deploymentTrigger().alreadyTriggered(status, versions)) { + if ( ! controller.applications().deploymentTrigger().testedIn(systemTest, status.get(systemTest), versions)) { pending++; pendingObject.setString(shortNameOf(systemTest, controller.system()), statusOf(controller, instance.id(), systemTest, versions)); } - if ( ! controller.applications().deploymentTrigger().testedIn(instance, stagingTest, versions)) { + if ( ! controller.applications().deploymentTrigger().testedIn(stagingTest, status.get(stagingTest), versions)) { pending++; pendingObject.setString(shortNameOf(stagingTest, controller.system()), statusOf(controller, instance.id(), stagingTest, versions)); } -- cgit v1.2.3