aboutsummaryrefslogtreecommitdiffstats
path: root/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
diff options
context:
space:
mode:
authorJon Marius Venstad <jvenstad@yahoo-inc.com>2018-10-08 10:08:12 +0200
committerJon Marius Venstad <jvenstad@yahoo-inc.com>2018-10-08 10:08:12 +0200
commitaee5bf11792099e9b770ae0f494880e9944e340d (patch)
tree0ceb4da0f8ded03d659ad3b5dc14f08ad336cc61 /controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
parente03e363438b8ba680f21242984e48547581d8f37 (diff)
Fix test instead :)
Diffstat (limited to 'controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java')
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java34
1 files changed, 18 insertions, 16 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
index 780cea6f1fa..e506c8c56ca 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/JobRunnerTest.java
@@ -15,7 +15,6 @@ import com.yahoo.vespa.hosted.controller.deployment.Step;
import com.yahoo.vespa.hosted.controller.deployment.Step.Status;
import com.yahoo.vespa.hosted.controller.deployment.StepRunner;
import com.yahoo.vespa.hosted.controller.deployment.Versions;
-import org.junit.Ignore;
import org.junit.Test;
import java.time.Duration;
@@ -27,10 +26,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,15 +72,14 @@ public class JobRunnerTest {
Optional.empty(),
Optional.empty());
- @Ignore
@Test
public void multiThreadedExecutionFinishes() throws InterruptedException {
DeploymentTester tester = new DeploymentTester();
JobController jobs = tester.controller().jobController();
StepRunner stepRunner = (step, id) -> id.type() == stagingTest && step.get() == startTests? Optional.of(error) : Optional.of(running);
- CountDownLatch latch = new CountDownLatch(19); // Number of steps that will run, below: all but endTests in staging and all 9 in system.
+ Phaser phaser = new Phaser(1);
JobRunner runner = new JobRunner(tester.controller(), Duration.ofDays(1), new JobControl(tester.controller().curator()),
- Executors.newFixedThreadPool(32), notifying(stepRunner, latch));
+ phasedExecutor(phaser), stepRunner);
ApplicationId id = tester.createApplication("real", "tenant", 1, 1L).id();
jobs.submit(id, versions.targetApplication().source().get(), new byte[0], new byte[0]);
@@ -100,10 +98,7 @@ public class JobRunnerTest {
assertFalse(jobs.last(id, stagingTest).get().hasEnded());
runner.maintain();
- latch.await(1, TimeUnit.SECONDS);
- assertEquals(0, latch.getCount());
-
- runner.deconstruct(); // Ensures all workers have finished writing to the curator.
+ phaser.arriveAndAwaitAdvance();
assertTrue(jobs.last(id, systemTest).get().steps().values().stream().allMatch(succeeded::equals));
assertTrue(jobs.last(id, stagingTest).get().hasEnded());
assertTrue(jobs.last(id, stagingTest).get().hasFailed());
@@ -286,14 +281,21 @@ public class JobRunnerTest {
};
}
- private static StepRunner notifying(StepRunner runner, CountDownLatch latch) {
- return (step, id) -> {
- Optional<RunStatus> status = runner.run(step, id);
- synchronized (latch) {
- assertTrue(latch.getCount() > 0);
- latch.countDown();
+ private static ExecutorService phasedExecutor(Phaser phaser) {
+ return new AbstractExecutorService() {
+ ExecutorService delegate = Executors.newFixedThreadPool(32);
+ @Override public void shutdown() { delegate.shutdown(); }
+ @Override public List<Runnable> shutdownNow() { return delegate.shutdownNow(); }
+ @Override public boolean isShutdown() { return delegate.isShutdown(); }
+ @Override public boolean isTerminated() { return delegate.isTerminated(); }
+ @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return delegate.awaitTermination(timeout, unit); }
+ @Override public void execute(Runnable command) {
+ phaser.register();
+ delegate.execute(() -> {
+ command.run();
+ phaser.arriveAndDeregister();
+ });
}
- return status;
};
}