aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2023-10-19 09:53:54 +0200
committerjonmv <venstad@gmail.com>2023-10-19 09:53:54 +0200
commit67710e7e80999eb97fdfe2df3873f05824ab503f (patch)
tree6a6b627df7256f88e306c50ce3016532f3674584
parent827aa7d4811fedc15abdb044e2bb5cdccba59e0b (diff)
Use BFS rather than DFS task order for in-thread test executor
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java63
2 files changed, 56 insertions, 9 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java
index c16234b3948..bc03d46a30a 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTester.java
@@ -75,7 +75,7 @@ public class DeploymentTester {
tester = controllerTester;
jobs = tester.controller().jobController();
cloud = (MockTesterCloud) tester.controller().jobController().cloud();
- runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadExecutor(), new InternalStepRunner(tester.controller()));
+ runner = new JobRunner(tester.controller(), maintenanceInterval, JobRunnerTest.inThreadInOrderExecutor(), new InternalStepRunner(tester.controller()));
upgrader = new Upgrader(tester.controller(), maintenanceInterval);
upgrader.setUpgradesPerMinute(1); // Anything that makes it at least one for any maintenance period is fine.
readyJobsTrigger = new ReadyJobsTrigger(tester.controller(), maintenanceInterval);
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
index 20717be598f..d96de8df6fd 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
@@ -34,18 +34,24 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -70,10 +76,12 @@ import static com.yahoo.vespa.hosted.controller.deployment.Step.installReal;
import static com.yahoo.vespa.hosted.controller.deployment.Step.installTester;
import static com.yahoo.vespa.hosted.controller.deployment.Step.report;
import static com.yahoo.vespa.hosted.controller.deployment.Step.startTests;
+import static java.util.Objects.requireNonNull;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -175,7 +183,7 @@ public class JobRunnerTest {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -322,7 +330,7 @@ public class JobRunnerTest {
void historyPruning() {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(running));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(running));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId instanceId = appId.defaultInstance();
@@ -347,7 +355,7 @@ public class JobRunnerTest {
assertFalse(jobs.details(new RunId(instanceId, systemTest, 1)).isPresent());
assertTrue(jobs.details(new RunId(instanceId, systemTest, 65)).isPresent());
- JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), (id, step) -> Optional.of(error));
+ JobRunner failureRunner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), (id, step) -> Optional.of(error));
// Make all but the oldest of the 54 jobs a failure.
for (int i = 0; i < jobs.historyLength() - 1; i++) {
@@ -419,7 +427,7 @@ public class JobRunnerTest {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -437,7 +445,7 @@ public class JobRunnerTest {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
Map<Step, RunStatus> outcomes = new EnumMap<>(Step.class);
- JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadExecutor(), mappedRunner(outcomes));
+ JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), inThreadInOrderExecutor(), mappedRunner(outcomes));
TenantAndApplicationId appId = tester.createApplication("tenant", "real", "default").id();
ApplicationId id = appId.defaultInstance();
@@ -473,19 +481,58 @@ public class JobRunnerTest {
assertEquals(1, metric.getMetric(context::equals, JobMetrics.noTests).get().intValue());
}
+ @Test
+ void testInThreadExecutor() throws InterruptedException {
+ ExecutorService executor = inThreadInOrderExecutor();
+ AtomicInteger c = new AtomicInteger(0), d = new AtomicInteger(0);
+ Consumer<AtomicInteger> task = i -> executor.execute(() -> {
+ executor.execute(() -> {
+ i.set(2);
+ executor.execute(() -> i.set(4));
+ });
+ executor.execute(() -> i.set(3));
+ i.set(1);
+ });
+ Thread s = new Thread(() -> task.accept(d));
+ s.start();
+ task.accept(c);
+ s.join();
+ assertEquals(4, c.get());
+ assertEquals(4, d.get());
+ assertEquals("executor is shut down",
+ assertThrows(RejectedExecutionException.class,
+ () -> executor.execute(() -> {
+ executor.execute(() -> executor.execute(() -> { c.set(6); }));
+ executor.shutdown();
+ c.set(5);
+ })).getMessage());
+ assertEquals(5, c.get());
+ }
+
private void start(JobController jobs, ApplicationId id, JobType type) {
jobs.start(id, type, versions, false, Reason.empty());
}
- public static ExecutorService inThreadExecutor() {
+ /** Dummy test executor for unit tests. Runs tasks BFS rather than DFS, like a simple {@code Runnable::run} would do. No real shutdown logic. */
+ public static ExecutorService inThreadInOrderExecutor() {
return new AbstractExecutorService() {
- final AtomicBoolean shutDown = new AtomicBoolean(false);
+ private final ThreadLocal<Boolean> inExecute = ThreadLocal.withInitial(() -> false);
+ private final ThreadLocal<Queue<Runnable>> tasks = ThreadLocal.withInitial(ConcurrentLinkedQueue::new);
+ private final AtomicBoolean shutDown = new AtomicBoolean(false);
+ @Override
+ public void execute(Runnable command) {
+ if (isShutdown()) throw new RejectedExecutionException("executor is shut down");
+ tasks.get().add(requireNonNull(command));
+ if (inExecute.get()) return;
+ inExecute.set(true);
+ try { Runnable task; while (null != (task = tasks.get().poll())) task.run(); }
+ finally { inExecute.set(false); }
+ }
@Override public void shutdown() { shutDown.set(true); }
@Override public List<Runnable> shutdownNow() { shutDown.set(true); return Collections.emptyList(); }
@Override public boolean isShutdown() { return shutDown.get(); }
@Override public boolean isTerminated() { return shutDown.get(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit) { return true; }
- @Override public void execute(Runnable command) { command.run(); }
};
}