diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 19:01:12 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 19:01:12 +0200 |
commit | 29f45f8cfdb15e87d3b0dfe312fdcdc39e84cf41 (patch) | |
tree | 8f6a866482dbdf40e3afb9c381fe2bb57e062c1c /documentapi | |
parent | 7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (diff) |
Tests and the fixes they entailed
Diffstat (limited to 'documentapi')
3 files changed, 25 insertions, 16 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 35dc3992179..ff3eeb02a71 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -44,7 +45,7 @@ public class LocalAsyncSession implements AsyncSession { private final Executor executor = Executors.newCachedThreadPool(); private AtomicLong requestId = new AtomicLong(0); - private AtomicReference<Runnable> synchronizer = new AtomicReference<>(); + private AtomicReference<Phaser> phaser = new AtomicReference<>(); private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { @@ -148,12 +149,14 @@ public class LocalAsyncSession implements AsyncSession { } /** - * This is run in a separate thread before providing the response from each accepted request, for advanced testing. + * When this is set, every operation is sent in a separate thread, which first registers with the given phaser, + * and then arrives and awaits advance so the user can trigger responses. After the response is delivered, + * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered. * * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. */ - public void setSynchronizer(Runnable synchronizer) { - this.synchronizer.set(synchronizer); + public void setPhaser(Phaser phaser) { + this.phaser.set(phaser); } /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ @@ -175,13 +178,15 @@ public class LocalAsyncSession implements AsyncSession { return new Result(resultType, new Error()); long req = requestId.incrementAndGet(); - Runnable runnable = synchronizer.get(); - if (runnable == null) + Phaser synchronizer = phaser.get(); + if (synchronizer == null) addResponse(responses.apply(req)); else executor.execute(() -> { - runnable.run(); + synchronizer.register(); + synchronizer.arriveAndAwaitAdvance(); addResponse(responses.apply(req)); + synchronizer.arriveAndDeregister(); }); return new Result(req); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index f087b646ca4..85be1c11fcd 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -76,8 +76,13 @@ public class LocalVisitorSession implements VisitorSession { if (state.get() != State.RUNNING) return; - if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + try { + if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + return; + } + catch (RuntimeException e) { return; + } Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 951d0405b40..33cae60ab93 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -100,13 +100,10 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { for (DocumentId id : ids) session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - // Let all async operations wait for a signal from the test thread before sending their responses. - // Phaser not strictly needed here, but it's reusable, dynamic, and doesn't throw InterruptedException. Love it! + // Let all async operations wait for a signal from the test thread before sending their responses, and let test + // thread wait for all responses to be delivered afterwards. Phaser phaser = new Phaser(1); - session.setSynchronizer(() -> { - phaser.register(); - phaser.awaitAdvance(phaser.arriveAndDeregister()); - }); + session.setPhaser(phaser); long startTime = System.currentTimeMillis(); int timeoutMillis = 1000; @@ -143,9 +140,11 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { }); // All operations, and receiver, now waiting for this thread to arrive. - assertEquals(1, phaser.getRegisteredParties()); + assertEquals(4, phaser.getRegisteredParties()); assertEquals(0, phaser.getPhase()); - phaser.awaitAdvance(phaser.arriveAndDeregister()); // Deregister so threads can finish without waiting after response. + phaser.arrive(); + assertEquals(1, phaser.getPhase()); + phaser.awaitAdvance(phaser.arriveAndDeregister()); futureWithAssertions.get(1000, TimeUnit.MILLISECONDS); } |