diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 08:57:59 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 10:23:35 +0200 |
commit | 374e7ae5caa94e2ba290fa5a85e88d95b972d3ca (patch) | |
tree | 0124e3b5b693656ec9ddd7204e8f34bfd8eb9a9d /documentapi | |
parent | f6b2e26033e09d220a6f58c42a793a169950cf8c (diff) |
Register operations with phaser in the caller thread
Diffstat (limited to 'documentapi')
3 files changed, 27 insertions, 14 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 895c3b61cfe..639031e4cdb 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -171,13 +171,18 @@ public class LocalAsyncSession implements AsyncSession { Phaser synchronizer = phaser.get(); if (synchronizer == null) addResponse(responses.apply(req)); - else + else { + synchronizer.register(); executor.execute(() -> { - synchronizer.register(); - synchronizer.arriveAndAwaitAdvance(); - addResponse(responses.apply(req)); - synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + try { + synchronizer.arriveAndAwaitAdvance(); + addResponse(responses.apply(req)); + } + finally { + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + } }); + } return new Result(req); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index f132b8f2ea9..dd21a55d029 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -75,10 +75,12 @@ public class LocalDocumentAccess extends DocumentAccess { * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the * document operations — which are then also done by a dedicated thread pool, instead of the caller thread. * - * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the - * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses. - * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user - * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage: + * When this is set, a party is registered with the phaser for the sender thread (visit) or for each document + * operation (async-session). The thread that sends a document (visit) or response (async-session) then arrives + * and awaits advance before sending each response, so the user can trigger these documents and responses. + * After the document or response is delivered, the thread arrives and awaits advance, so the user + * can wait until the document or response has been delivered. This also ensures memory visibility. + * The visit sender thread deregisters when the whole visit is complete. Example usage: * * <pre> {@code * void testOperations(LocalDocumentAccess access) { diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 0afc9b58be2..e0ae0278de8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -72,6 +72,10 @@ public class LocalVisitorSession implements VisitorSession { } void start() { + Phaser synchronizer = phaser.get(); + if (synchronizer != null) + synchronizer.register(); + new Thread(() -> { try { // Iterate through all documents and pass on to data handler @@ -91,15 +95,14 @@ public class LocalVisitorSession implements VisitorSession { new FieldSetRepo().copyFields(document, copy, fieldSet); - Phaser synchronizer = phaser.get(); - if (synchronizer != null) { - synchronizer.register(); + if (synchronizer != null) synchronizer.arriveAndAwaitAdvance(); - } + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), new AckToken(id)); + if (synchronizer != null) - synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + synchronizer.arriveAndAwaitAdvance(); }); // Transition to a terminal state when done state.updateAndGet(current -> { @@ -123,6 +126,9 @@ public class LocalVisitorSession implements VisitorSession { control.onDone(VisitorControlHandler.CompletionCode.FAILURE, Exceptions.toMessageString(e)); } finally { + if (synchronizer != null) + synchronizer.arriveAndDeregister(); + data.onDone(); } }).start(); |