aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-28 19:01:12 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-28 19:01:12 +0200
commit29f45f8cfdb15e87d3b0dfe312fdcdc39e84cf41 (patch)
tree8f6a866482dbdf40e3afb9c381fe2bb57e062c1c /documentapi
parent7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (diff)
Tests and the fixes they entailed
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java19
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java7
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java15
3 files changed, 25 insertions, 16 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 35dc3992179..ff3eeb02a71 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -44,7 +45,7 @@ public class LocalAsyncSession implements AsyncSession {
private final Executor executor = Executors.newCachedThreadPool();
private AtomicLong requestId = new AtomicLong(0);
- private AtomicReference<Runnable> synchronizer = new AtomicReference<>();
+ private AtomicReference<Phaser> phaser = new AtomicReference<>();
private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
@@ -148,12 +149,14 @@ public class LocalAsyncSession implements AsyncSession {
}
/**
- * This is run in a separate thread before providing the response from each accepted request, for advanced testing.
+ * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
+ * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
+ * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
*
* If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
*/
- public void setSynchronizer(Runnable synchronizer) {
- this.synchronizer.set(synchronizer);
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
}
/** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
@@ -175,13 +178,15 @@ public class LocalAsyncSession implements AsyncSession {
return new Result(resultType, new Error());
long req = requestId.incrementAndGet();
- Runnable runnable = synchronizer.get();
- if (runnable == null)
+ Phaser synchronizer = phaser.get();
+ if (synchronizer == null)
addResponse(responses.apply(req));
else
executor.execute(() -> {
- runnable.run();
+ synchronizer.register();
+ synchronizer.arriveAndAwaitAdvance();
addResponse(responses.apply(req));
+ synchronizer.arriveAndDeregister();
});
return new Result(req);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index f087b646ca4..85be1c11fcd 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -76,8 +76,13 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ try {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ return;
+ }
+ catch (RuntimeException e) {
return;
+ }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 951d0405b40..33cae60ab93 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -100,13 +100,10 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- // Let all async operations wait for a signal from the test thread before sending their responses.
- // Phaser not strictly needed here, but it's reusable, dynamic, and doesn't throw InterruptedException. Love it!
+ // Let all async operations wait for a signal from the test thread before sending their responses, and let test
+ // thread wait for all responses to be delivered afterwards.
Phaser phaser = new Phaser(1);
- session.setSynchronizer(() -> {
- phaser.register();
- phaser.awaitAdvance(phaser.arriveAndDeregister());
- });
+ session.setPhaser(phaser);
long startTime = System.currentTimeMillis();
int timeoutMillis = 1000;
@@ -143,9 +140,11 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
});
// All operations, and receiver, now waiting for this thread to arrive.
- assertEquals(1, phaser.getRegisteredParties());
+ assertEquals(4, phaser.getRegisteredParties());
assertEquals(0, phaser.getPhase());
- phaser.awaitAdvance(phaser.arriveAndDeregister()); // Deregister so threads can finish without waiting after response.
+ phaser.arrive();
+ assertEquals(1, phaser.getPhase());
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
}