diff options
author | jonmv <venstad@gmail.com> | 2023-10-19 09:53:54 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-10-19 09:53:54 +0200 |
commit | 67710e7e80999eb97fdfe2df3873f05824ab503f (patch) | |
tree | 6a6b627df7256f88e306c50ce3016532f3674584 | |
parent | 827aa7d4811fedc15abdb044e2bb5cdccba59e0b (diff) |
Use BFS rather than DFS task order for in-thread test executor
2 files changed, 56 insertions, 9 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java index c16234b3948..bc03d46a30a 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java @@ -75,7 +75,7 @@ public class DeploymentTester { tester = controllerTester; jobs = tester.controller().jobController(); cloud = (MockTesterCloud) tester.controller().jobController().cloud(); - runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadExecutor(), new InternalStepRunner(tester.controller())); + runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadInOrderExecutor(), new InternalStepRunner(tester.controller())); upgrader = new Upgrader(tester.controller(), maintenanceInterval); upgrader.setUpgradesPerMinute(1); // Anything that makes it at least one for any maintenance period is fine. readyJobsTrigger = new ReadyJobsTrigger(tester.controller(), maintenanceInterval); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java index 20717be598f..d96de8df6fd 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java @@ -34,18 +34,24 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Queue; import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Phaser; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -70,10 +76,12 @@ import static com.yahoo.vespa.hosted.controller.deployment.Step.installReal; import static com.yahoo.vespa.hosted.controller.deployment.Step.installTester; import static com.yahoo.vespa.hosted.controller.deployment.Step.report; import static com.yahoo.vespa.hosted.controller.deployment.Step.startTests; +import static java.util.Objects.requireNonNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -175,7 +183,7 @@ public class JobRunnerTest { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -322,7 +330,7 @@ public class JobRunnerTest { void historyPruning() { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(running)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(running)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId instanceId = appId.defaultInstance(); @@ -347,7 +355,7 @@ public class JobRunnerTest { assertFalse(jobs.details(new RunId(instanceId, systemTest, 1)).isPresent()); assertTrue(jobs.details(new RunId(instanceId, systemTest, 65)).isPresent()); - JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(error)); + JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(error)); // Make all but the oldest of the 54 jobs a failure. for (int i = 0; i < jobs.historyLength() - 1; i++) { @@ -419,7 +427,7 @@ public class JobRunnerTest { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -437,7 +445,7 @@ public class JobRunnerTest { DeploymentTester tester = new DeploymentTester(); JobController jobs = tester.controller().jobController(); Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class); - JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes)); + JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes)); TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id(); ApplicationId id = appId.defaultInstance(); @@ -473,19 +481,58 @@ public class JobRunnerTest { assertEquals(1, metric.getMetric(context::equals, JobMetrics.noTests).get().intValue()); } + @Test + void testInThreadExecutor() throws InterruptedException { + ExecutorService executor = inThreadInOrderExecutor(); + AtomicInteger c = new AtomicInteger(0), d = new AtomicInteger(0); + Consumer<AtomicInteger> task = i -> executor.execute(() -> { + executor.execute(() -> { + i.set(2); + executor.execute(() -> i.set(4)); + }); + executor.execute(() -> i.set(3)); + i.set(1); + }); + Thread s = new Thread(() -> task.accept(d)); + s.start(); + task.accept(c); + s.join(); + assertEquals(4, c.get()); + assertEquals(4, d.get()); + assertEquals("executor is shut down", + assertThrows(RejectedExecutionException.class, + () -> executor.execute(() -> { + executor.execute(() -> executor.execute(() -> { c.set(6); })); + executor.shutdown(); + c.set(5); + })).getMessage()); + assertEquals(5, c.get()); + } + private void start(JobController jobs, ApplicationId id, JobType type) { jobs.start(id, type, versions, false, Reason.empty()); } - public static ExecutorService inThreadExecutor() { + /** Dummy test executor for unit tests. Runs tasks BFS rather than DFS, like a simple {@code Runnable::run} would do. No real shutdown logic. */ + public static ExecutorService inThreadInOrderExecutor() { return new AbstractExecutorService() { - final AtomicBoolean shutDown = new AtomicBoolean(false); + private final ThreadLocal<Boolean> inExecute = ThreadLocal.withInitial(() -> false); + private final ThreadLocal<Queue<Runnable>> tasks = ThreadLocal.withInitial(ConcurrentLinkedQueue::new); + private final AtomicBoolean shutDown = new AtomicBoolean(false); + @Override + public void execute(Runnable command) { + if (isShutdown()) throw new RejectedExecutionException("executor is shut down"); + tasks.get().add(requireNonNull(command)); + if (inExecute.get()) return; + inExecute.set(true); + try { Runnable task; while (null != (task = tasks.get().poll())) task.run(); } + finally { inExecute.set(false); } + } @Override public void shutdown() { shutDown.set(true); } @Override public List<Runnable> shutdownNow() { shutDown.set(true); return Collections.emptyList(); } @Override public boolean isShutdown() { return shutDown.get(); } @Override public boolean isTerminated() { return shutDown.get(); } @Override public boolean awaitTermination(long timeout, TimeUnit unit) { return true; } - @Override public void execute(Runnable command) { command.run(); } }; } |