summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-30 08:57:59 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:35 +0200
commit374e7ae5caa94e2ba290fa5a85e88d95b972d3ca (patch)
tree0124e3b5b693656ec9ddd7204e8f34bfd8eb9a9d /documentapi
parentf6b2e26033e09d220a6f58c42a793a169950cf8c (diff)
Register operations with phaser in the caller thread
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java10
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java16
3 files changed, 27 insertions, 14 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 895c3b61cfe..639031e4cdb 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -171,13 +171,18 @@ public class LocalAsyncSession implements AsyncSession {
Phaser synchronizer = phaser.get();
if (synchronizer == null)
addResponse(responses.apply(req));
- else
+ else {
+ synchronizer.register();
executor.execute(() -> {
- synchronizer.register();
- synchronizer.arriveAndAwaitAdvance();
- addResponse(responses.apply(req));
- synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ try {
+ synchronizer.arriveAndAwaitAdvance();
+ addResponse(responses.apply(req));
+ }
+ finally {
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ }
});
+ }
return new Result(req);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index f132b8f2ea9..dd21a55d029 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -75,10 +75,12 @@ public class LocalDocumentAccess extends DocumentAccess {
* {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the
* document operations — which are then also done by a dedicated thread pool, instead of the caller thread.
*
- * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the
- * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses.
- * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user
- * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage:
+ * When this is set, a party is registered with the phaser for the sender thread (visit) or for each document
+ * operation (async-session). The thread that sends a document (visit) or response (async-session) then arrives
+ * and awaits advance before sending each response, so the user can trigger these documents and responses.
+ * After the document or response is delivered, the thread arrives and awaits advance, so the user
+ * can wait until the document or response has been delivered. This also ensures memory visibility.
+ * The visit sender thread deregisters when the whole visit is complete. Example usage:
*
* <pre> {@code
* void testOperations(LocalDocumentAccess access) {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 0afc9b58be2..e0ae0278de8 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -72,6 +72,10 @@ public class LocalVisitorSession implements VisitorSession {
}
void start() {
+ Phaser synchronizer = phaser.get();
+ if (synchronizer != null)
+ synchronizer.register();
+
new Thread(() -> {
try {
// Iterate through all documents and pass on to data handler
@@ -91,15 +95,14 @@ public class LocalVisitorSession implements VisitorSession {
new FieldSetRepo().copyFields(document, copy, fieldSet);
- Phaser synchronizer = phaser.get();
- if (synchronizer != null) {
- synchronizer.register();
+ if (synchronizer != null)
synchronizer.arriveAndAwaitAdvance();
- }
+
data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
new AckToken(id));
+
if (synchronizer != null)
- synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ synchronizer.arriveAndAwaitAdvance();
});
// Transition to a terminal state when done
state.updateAndGet(current -> {
@@ -123,6 +126,9 @@ public class LocalVisitorSession implements VisitorSession {
control.onDone(VisitorControlHandler.CompletionCode.FAILURE, Exceptions.toMessageString(e));
}
finally {
+ if (synchronizer != null)
+ synchronizer.arriveAndDeregister();
+
data.onDone();
}
}).start();