diff options
author | jonmv <venstad@gmail.com> | 2023-10-11 21:45:53 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-11 21:45:53 +0200 |
commit | 894738eb2e0a88a8868c6abaa692da176ef83969 (patch) | |
tree | b9edf37163471cb5bc55317c1b625493ba39af1a /controller-server | |
parent | f47b44fb69228c7a7ec316d5213508bd563bf907 (diff) |
Revert "Merge pull request #28879 from vespa-engine/revert-28869-jonmv/job-runner-thread-metrics"
This reverts commit 67351aa3e2adbbb4872097ed799f1ca837f35e6d, reversing
changes made to aed7902ee0371efb89747d467c4a2f8124ddc08d.
Diffstat (limited to 'controller-server')
4 files changed, 115 insertions, 7 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java index 662b4018a34..1080b379c4d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/InternalStepRunner.java @@ -727,6 +727,8 @@ public class InternalStepRunner implements StepRunner { DeploymentSpec spec = controller.applications().requireApplication(TenantAndApplicationId.from(id.application())).deploymentSpec(); boolean requireTests = spec.steps().stream().anyMatch(step -> step.concerns(id.type().environment())); + logger.log(WARNING, "No tests were actually run, but this test suite is explicitly declared in 'deployment.xml'. " + + "Either add tests, ensure they're correctly configured, or remove the test declaration."); return Optional.of(requireTests ? testFailure : noTests); } case SUCCESS: diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java index 6f00ff39637..0f482b1a015 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunner.java @@ -1,7 +1,9 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.maintenance; +import ai.vespa.metrics.ControllerMetrics; import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.jdisc.Metric; import com.yahoo.vespa.hosted.controller.Controller; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RunId; import com.yahoo.vespa.hosted.controller.deployment.InternalStepRunner; @@ -11,11 +13,14 @@ import com.yahoo.vespa.hosted.controller.deployment.Step; import com.yahoo.vespa.hosted.controller.deployment.StepRunner; import java.time.Duration; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,22 +37,29 @@ public class JobRunner extends ControllerMaintainer { private final JobController jobs; private final ExecutorService executors; private final StepRunner runner; + private final Metrics metrics; public JobRunner(Controller controller, Duration duration) { - this(controller, duration, Executors.newFixedThreadPool(32, new DaemonThreadFactory("job-runner-")), new InternalStepRunner(controller)); + this(controller, duration, Executors.newFixedThreadPool(32, new DaemonThreadFactory("job-runner-")), + new InternalStepRunner(controller)); } public JobRunner(Controller controller, Duration duration, ExecutorService executors, StepRunner runner) { + this(controller, duration, executors, runner, new Metrics(controller.metric(), Duration.ofMillis(100))); + } + + JobRunner(Controller controller, Duration duration, ExecutorService executors, StepRunner runner, Metrics metrics) { super(controller, duration); this.jobs = controller.jobController(); this.jobs.setRunner(this::advance); this.executors = executors; this.runner = runner; + this.metrics = metrics; } @Override protected double maintain() { - executors.execute(() -> jobs.active().forEach(this::advance)); + execute(() -> jobs.active().forEach(this::advance)); jobs.collectGarbage(); return 1.0; } @@ -55,6 +67,7 @@ public class JobRunner extends ControllerMaintainer { @Override public void shutdown() { super.shutdown(); + metrics.shutdown(); executors.shutdown(); } @@ -83,14 +96,14 @@ public class JobRunner extends ControllerMaintainer { jobs.locked(id, run -> { if ( ! run.hasFailed() && controller().clock().instant().isAfter(run.sleepUntil().orElse(run.start()).plus(jobTimeout))) - executors.execute(() -> { + execute(() -> { jobs.abort(run.id(), "job timeout of " + jobTimeout + " reached", false); advance(run.id()); }); else if (run.readySteps().isEmpty()) - executors.execute(() -> finish(run.id())); + execute(() -> finish(run.id())); else if (run.hasFailed() || run.sleepUntil().map(sleepUntil -> ! sleepUntil.isAfter(controller().clock().instant())).orElse(true)) - run.readySteps().forEach(step -> executors.execute(() -> advance(run.id(), step))); + run.readySteps().forEach(step -> execute(() -> advance(run.id(), step))); return null; }); @@ -145,4 +158,39 @@ public class JobRunner extends ControllerMaintainer { } } + private void execute(Runnable task) { + metrics.queued.incrementAndGet(); + executors.execute(() -> { + metrics.queued.decrementAndGet(); + metrics.active.incrementAndGet(); + try { task.run(); } + finally { metrics.active.decrementAndGet(); } + }); + } + + static class Metrics { + + private final AtomicInteger queued = new AtomicInteger(); + private final AtomicInteger active = new AtomicInteger(); + private final ScheduledExecutorService reporter = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("job-runner-metrics-")); + private final Metric metric; + private final Metric.Context context; + + Metrics(Metric metric, Duration interval) { + this.metric = metric; + this.context = metric.createContext(Map.of()); + reporter.scheduleAtFixedRate(this::report, interval.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); + } + + void report() { + metric.set(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), queued.get(), context); + metric.set(ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), active.get(), context); + } + + void shutdown() { + reporter.shutdown(); + } + + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java index 1cc549ec6ca..c16234b3948 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java @@ -75,8 +75,7 @@ public class DeploymentTester { tester = controllerTester; jobs = tester.controller().jobController(); cloud = (MockTesterCloud) tester.controller().jobController().cloud(); - runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadExecutor(), - new InternalStepRunner(tester.controller())); + runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadExecutor(), new InternalStepRunner(tester.controller())); upgrader = new Upgrader(tester.controller(), maintenanceInterval); upgrader.setUpgradesPerMinute(1); // Anything that makes it at least one for any maintenance period is fine. readyJobsTrigger = new ReadyJobsTrigger(tester.controller(), maintenanceInterval); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java index e87d4f1f3f0..53be33fdfd7 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java @@ -1,8 +1,10 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.maintenance; +import ai.vespa.metrics.ControllerMetrics; import com.yahoo.component.Version; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.jdisc.test.MockMetric; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId; @@ -22,6 +24,7 @@ import com.yahoo.vespa.hosted.controller.deployment.StepRunner; import com.yahoo.vespa.hosted.controller.deployment.Submission; import com.yahoo.vespa.hosted.controller.deployment.Versions; import com.yahoo.vespa.hosted.controller.integration.MetricsMock; +import com.yahoo.vespa.hosted.controller.maintenance.JobRunner.Metrics; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -37,7 +40,9 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -121,6 +126,60 @@ public class JobRunnerTest { } @Test + void metrics() { + Phaser phaser = new Phaser(4); + StepRunner runner = (step, id) -> { + phaser.arrive(); + phaser.arriveAndAwaitAdvance(); + return Optional.of(running); + }; + ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), (task, pool) -> task.run()); + DeploymentTester tester = new DeploymentTester(); + MockMetric metric = new MockMetric(); + Metrics metrics = new Metrics(metric, Duration.ofDays(1)); + JobRunner jobs = new JobRunner(tester.controller(), Duration.ofDays(1), executor, runner, metrics); + tester.newDeploymentContext().submit(); + + assertEquals(Map.of(), metric.metrics()); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 0.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 0.0)), + metric.metrics()); + tester.triggerJobs(); + + assertEquals(2, tester.jobs().active().size()); + jobs.maintain(); + phaser.arriveAndAwaitAdvance(); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 1.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 3.0)), + metric.metrics()); + + phaser.arrive(); + phaser.arriveAndAwaitAdvance(); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 1.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 3.0)), + metric.metrics()); + + jobs.shutdown(); + phaser.forceTermination(); + jobs.awaitShutdown(); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 0.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 0.0)), + metric.metrics()); + } + + @Test void stepLogic() { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); |