diff options
Diffstat (limited to 'controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java')
-rw-r--r-- | controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java | 115 |
1 files changed, 106 insertions, 9 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java index 3ee6c7aadc3..d96de8df6fd 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java @@ -1,8 +1,10 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.maintenance; +import ai.vespa.metrics.ControllerMetrics; import com.yahoo.component.Version; import com.yahoo.config.provision.ApplicationId; +import com.yahoo.jdisc.test.MockMetric; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType; import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId; @@ -22,6 +24,7 @@ import com.yahoo.vespa.hosted.controller.deployment.StepRunner; import com.yahoo.vespa.hosted.controller.deployment.Submission; import com.yahoo.vespa.hosted.controller.deployment.Versions; import com.yahoo.vespa.hosted.controller.integration.MetricsMock; +import com.yahoo.vespa.hosted.controller.maintenance.JobRunner.Metrics; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -31,16 +34,24 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -65,10 +76,12 @@ import static com.yahoo.vespa.hosted.controller.deployment.Step.installReal; import static com.yahoo.vespa.hosted.controller.deployment.Step.installTester; import static com.yahoo.vespa.hosted.controller.deployment.Step.report; import static com.yahoo.vespa.hosted.controller.deployment.Step.startTests; +import static java.util.Objects.requireNonNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -121,11 +134,56 @@ public class JobRunnerTest { } @Test + void metrics() { + Phaser phaser = new Phaser(4); + StepRunner runner = (step, id) -> { + phaser.arriveAndAwaitAdvance(); + phaser.arriveAndAwaitAdvance(); + return Optional.of(running); + }; + ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), (task, pool) -> task.run()); + DeploymentTester tester = new DeploymentTester(); + MockMetric metric = new MockMetric(); + Metrics metrics = new Metrics(metric, Duration.ofDays(1)); + JobRunner jobs = new JobRunner(tester.controller(), Duration.ofDays(1), executor, runner, metrics); + tester.newDeploymentContext().submit(); + + assertEquals(Map.of(), metric.metrics()); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 0.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 0.0)), + metric.metrics()); + tester.triggerJobs(); + + assertEquals(2, tester.jobs().active().size()); + jobs.maintain(); + phaser.arriveAndAwaitAdvance(); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 1.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 3.0)), + metric.metrics()); + + jobs.shutdown(); + phaser.forceTermination(); + jobs.awaitShutdown(); + metrics.report(); + assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(), + Map.of(Map.of(), 0.0), + ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(), + Map.of(Map.of(), 0.0)), + metric.metrics()); + } + + @Test void stepLogic() { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -272,7 +330,7 @@ public class JobRunnerTest { void historyPruning() { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(running)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(running)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId instanceId = appId.defaultInstance(); @@ -297,7 +355,7 @@ public class JobRunnerTest { assertFalse(jobs.details(new RunId(instanceId, systemTest, 1)).isPresent()); assertTrue(jobs.details(new RunId(instanceId, systemTest, 65)).isPresent()); - JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(error)); + JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(error)); // Make all but the oldest of the 54 jobs a failure. for (int i = 0; i < jobs.historyLength() - 1; i++) { @@ -369,7 +427,7 @@ public class JobRunnerTest { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -387,7 +445,7 @@ public class JobRunnerTest { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -423,19 +481,58 @@ public class JobRunnerTest { assertEquals(1, metric.getMetric(context::equals, JobMetrics.noTests).get().intValue()); } + @Test + void testInThreadExecutor() throws InterruptedException { + ExecutorService executor = inThreadInOrderExecutor(); + AtomicInteger c = new AtomicInteger(0), d = new AtomicInteger(0); + Consumer<AtomicInteger> task = i -> executor.execute(() -> { + executor.execute(() -> { + i.set(2); + executor.execute(() -> i.set(4)); + }); + executor.execute(() -> i.set(3)); + i.set(1); + }); + Thread s = new Thread(() -> task.accept(d)); + s.start(); + task.accept(c); + s.join(); + assertEquals(4, c.get()); + assertEquals(4, d.get()); + assertEquals("executor is shut down", + assertThrows(RejectedExecutionException.class, + () -> executor.execute(() -> { + executor.execute(() -> executor.execute(() -> { c.set(6); })); + executor.shutdown(); + c.set(5); + })).getMessage()); + assertEquals(5, c.get()); + } + private void start(JobController jobs, ApplicationId id, JobType type) { jobs.start(id, type, versions, false, Reason.empty()); } - public static ExecutorService inThreadExecutor() { + /** Dummy test executor for unit tests. Runs tasks BFS rather than DFS, like a simple {@code Runnable::run} would do. No real shutdown logic. */ + public static ExecutorService inThreadInOrderExecutor() { return new AbstractExecutorService() { - final AtomicBoolean shutDown = new AtomicBoolean(false); + private final ThreadLocal<Boolean> inExecute = ThreadLocal.withInitial(() -> false); + private final ThreadLocal<Queue<Runnable>> tasks = ThreadLocal.withInitial(ConcurrentLinkedQueue::new); + private final AtomicBoolean shutDown = new AtomicBoolean(false); + @Override + public void execute(Runnable command) { + if (isShutdown()) throw new RejectedExecutionException("executor is shut down"); + tasks.get().add(requireNonNull(command)); + if (inExecute.get()) return; + inExecute.set(true); + try { Runnable task; while (null != (task = tasks.get().poll())) task.run(); } + finally { inExecute.set(false); } + } @Override public void shutdown() { shutDown.set(true); } @Override public List<Runnable> shutdownNow() { shutDown.set(true); return Collections.emptyList(); } @Override public boolean isShutdown() { return shutDown.get(); } @Override public boolean isTerminated() { return shutDown.get(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) { return true; } - @Override public void execute(Runnable command) { command.run(); } }; } |