aboutsummaryrefslogtreecommitdiffstats
path: root/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java')
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java115
1 files changed, 106 insertions, 9 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
index 3ee6c7aadc3..d96de8df6fd 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
@@ -1,8 +1,10 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.maintenance;
+import ai.vespa.metrics.ControllerMetrics;
import com.yahoo.component.Version;
import com.yahoo.config.provision.ApplicationId;
+import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobId;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.JobType;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.RevisionId;
@@ -22,6 +24,7 @@ import com.yahoo.vespa.hosted.controller.deployment.StepRunner;
import com.yahoo.vespa.hosted.controller.deployment.Submission;
import com.yahoo.vespa.hosted.controller.deployment.Versions;
import com.yahoo.vespa.hosted.controller.integration.MetricsMock;
+import com.yahoo.vespa.hosted.controller.maintenance.JobRunner.Metrics;
import org.junit.jupiter.api.Test;
import java.time.Duration;
@@ -31,16 +34,24 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -65,10 +76,12 @@ import static com.yahoo.vespa.hosted.controller.deployment.Step.installReal;
import static com.yahoo.vespa.hosted.controller.deployment.Step.installTester;
import static com.yahoo.vespa.hosted.controller.deployment.Step.report;
import static com.yahoo.vespa.hosted.controller.deployment.Step.startTests;
+import static java.util.Objects.requireNonNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -121,11 +134,56 @@ public class JobRunnerTest {
}
@Test
+ void metrics() {
+ Phaser phaser = new Phaser(4);
+ StepRunner runner = (step, id) -> {
+ phaser.arriveAndAwaitAdvance();
+ phaser.arriveAndAwaitAdvance();
+ return Optional.of(running);
+ };
+ ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), (task, pool) -> task.run());
+ DeploymentTester tester = new DeploymentTester();
+ MockMetric metric = new MockMetric();
+ Metrics metrics = new Metrics(metric, Duration.ofDays(1));
+ JobRunner jobs = new JobRunner(tester.controller(), Duration.ofDays(1), executor, runner, metrics);
+ tester.newDeploymentContext().submit();
+
+ assertEquals(Map.of(), metric.metrics());
+ metrics.report();
+ assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(),
+ Map.of(Map.of(), 0.0),
+ ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(),
+ Map.of(Map.of(), 0.0)),
+ metric.metrics());
+ tester.triggerJobs();
+
+ assertEquals(2, tester.jobs().active().size());
+ jobs.maintain();
+ phaser.arriveAndAwaitAdvance();
+ metrics.report();
+ assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(),
+ Map.of(Map.of(), 1.0),
+ ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(),
+ Map.of(Map.of(), 3.0)),
+ metric.metrics());
+
+ jobs.shutdown();
+ phaser.forceTermination();
+ jobs.awaitShutdown();
+ metrics.report();
+ assertEquals(Map.of(ControllerMetrics.DEPLOYMENT_JOBS_QUEUED.baseName(),
+ Map.of(Map.of(), 0.0),
+ ControllerMetrics.DEPLOYMENT_JOBS_ACTIVE.baseName(),
+ Map.of(Map.of(), 0.0)),
+ metric.metrics());
+ }
+
+ @Test
void stepLogic() {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -272,7 +330,7 @@ public class JobRunnerTest {
void historyPruning() {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(running));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(running));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId instanceId = appId.defaultInstance();
@@ -297,7 +355,7 @@ public class JobRunnerTest {
assertFalse(jobs.details(new RunId(instanceId, systemTest, 1)).isPresent());
assertTrue(jobs.details(new RunId(instanceId, systemTest, 65)).isPresent());
- JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(error));
+ JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(error));
// Make all but the oldest of the 54 jobs a failure.
for (int i = 0; i < jobs.historyLength() - 1; i++) {
@@ -369,7 +427,7 @@ public class JobRunnerTest {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -387,7 +445,7 @@ public class JobRunnerTest {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -423,19 +481,58 @@ public class JobRunnerTest {
assertEquals(1, metric.getMetric(context::equals, JobMetrics.noTests).get().intValue());
}
+ @Test
+ void testInThreadExecutor() throws InterruptedException {
+ ExecutorService executor = inThreadInOrderExecutor();
+ AtomicInteger c = new AtomicInteger(0), d = new AtomicInteger(0);
+ Consumer<AtomicInteger> task = i -> executor.execute(() -> {
+ executor.execute(() -> {
+ i.set(2);
+ executor.execute(() -> i.set(4));
+ });
+ executor.execute(() -> i.set(3));
+ i.set(1);
+ });
+ Thread s = new Thread(() -> task.accept(d));
+ s.start();
+ task.accept(c);
+ s.join();
+ assertEquals(4, c.get());
+ assertEquals(4, d.get());
+ assertEquals("executor is shut down",
+ assertThrows(RejectedExecutionException.class,
+ () -> executor.execute(() -> {
+ executor.execute(() -> executor.execute(() -> { c.set(6); }));
+ executor.shutdown();
+ c.set(5);
+ })).getMessage());
+ assertEquals(5, c.get());
+ }
+
private void start(JobController jobs, ApplicationId id, JobType type) {
jobs.start(id, type, versions, false, Reason.empty());
}
- public static ExecutorService inThreadExecutor() {
+ /** Dummy test executor for unit tests. Runs tasks BFS rather than DFS, like a simple {@code Runnable::run} would do. No real shutdown logic. */
+ public static ExecutorService inThreadInOrderExecutor() {
return new AbstractExecutorService() {
- final AtomicBoolean shutDown = new AtomicBoolean(false);
+ private final ThreadLocal<Boolean> inExecute = ThreadLocal.withInitial(() -> false);
+ private final ThreadLocal<Queue<Runnable>> tasks = ThreadLocal.withInitial(ConcurrentLinkedQueue::new);
+ private final AtomicBoolean shutDown = new AtomicBoolean(false);
+ @Override
+ public void execute(Runnable command) {
+ if (isShutdown()) throw new RejectedExecutionException("executor is shut down");
+ tasks.get().add(requireNonNull(command));
+ if (inExecute.get()) return;
+ inExecute.set(true);
+ try { Runnable task; while (null != (task = tasks.get().poll())) task.run(); }
+ finally { inExecute.set(false); }
+ }
@Override public void shutdown() { shutDown.set(true); }
@Override public List<Runnable> shutdownNow() { shutDown.set(true); return Collections.emptyList(); }
@Override public boolean isShutdown() { return shutDown.get(); }
@Override public boolean isTerminated() { return shutDown.get(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit) { return true; }
- @Override public void execute(Runnable command) { command.run(); }
};
}