diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-29 11:30:15 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 10:23:35 +0200 |
commit | 264cd85f6aa90d08438d479b59598b833485ac0a (patch) | |
tree | 90a58075ef3dd12f1378781d2a16ae514c4f199c /documentapi/src | |
parent | 11405e52f2853e44df0944bf7bbee13dc2e617a5 (diff) |
Move setPhaser to LocalDocumentAccess
Diffstat (limited to 'documentapi/src')
4 files changed, 56 insertions, 18 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index ff3eeb02a71..895c3b61cfe 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -43,14 +43,15 @@ public class LocalAsyncSession implements AsyncSession { private final ResponseHandler handler; private final SyncSession syncSession; private final Executor executor = Executors.newCachedThreadPool(); + private final AtomicReference<Phaser> phaser; private AtomicLong requestId = new AtomicLong(0); - private AtomicReference<Phaser> phaser = new AtomicReference<>(); private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); - syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.phaser = access.phaser; } @Override @@ -148,18 +149,7 @@ public class LocalAsyncSession implements AsyncSession { // empty } - /** - * When this is set, every operation is sent in a separate thread, which first registers with the given phaser, - * and then arrives and awaits advance so the user can trigger responses. After the response is delivered, - * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered. - * - * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. - */ - public void setPhaser(Phaser phaser) { - this.phaser.set(phaser); - } - - /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ + /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */ public void setResultType(Result.ResultType resultType) { this.result.set(resultType); } @@ -186,7 +176,7 @@ public class LocalAsyncSession implements AsyncSession { synchronizer.register(); synchronizer.arriveAndAwaitAdvance(); addResponse(responses.apply(req)); - synchronizer.arriveAndDeregister(); + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); }); return new Result(req); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index e24853b9294..f132b8f2ea9 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.SubscriptionParameters; import com.yahoo.documentapi.SubscriptionSession; import com.yahoo.documentapi.SyncParameters; -import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.VisitorDestinationParameters; import com.yahoo.documentapi.VisitorDestinationSession; import com.yahoo.documentapi.VisitorParameters; @@ -19,6 +18,8 @@ import com.yahoo.documentapi.VisitorSession; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; /** * The main class of the local implementation of the document api @@ -27,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class LocalDocumentAccess extends DocumentAccess { - Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final AtomicReference<Phaser> phaser = new AtomicReference<>(); public LocalDocumentAccess(DocumentAccessParams params) { super(params); @@ -63,4 +65,39 @@ public class LocalDocumentAccess extends DocumentAccess { throw new UnsupportedOperationException("Not supported yet"); } + /** + * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this. + * + * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default, + * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession}, + * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread. + * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a + * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the + * document operations — which are then also done by a dedicated thread pool, instead of the caller thread. + * + * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the + * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses. + * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user + * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage: + * + * <pre> {@code + * void testOperations(LocalDocumentAccess access) { + * List<Response> responses = new ArrayList<>(); + * Phaser phaser = new Phaser(1); // "1" to register self + * access.setPhaser(phaser); + * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add)); + * session.put(documentPut); + * session.get(documentId); + * // Operations wait for this thread to arrive at "phaser" + * phaser.arrive(); // Let operations send their responses + * // "responses" may or may not hold the responses now + * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc. + * // "responses" now has responses from all previous operations + * phaser.arriveAndDeregister(); // Deregister so further operations flow freely + * }}</pre> + */ + public void setPhaser(Phaser phaser) { + this.phaser.set(phaser); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 85be1c11fcd..0afc9b58be2 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions; import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; /** @@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession { private final DocumentSelector selector; private final FieldSet fieldSet; private final AtomicReference<State> state; + private final AtomicReference<Phaser> phaser; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { if (parameters.getResumeToken() != null) @@ -64,6 +66,7 @@ public class LocalVisitorSession implements VisitorSession { this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString)); this.outstanding.putAll(access.documents); this.state = new AtomicReference<>(State.RUNNING); + this.phaser = access.phaser; start(); } @@ -87,8 +90,16 @@ public class LocalVisitorSession implements VisitorSession { Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); + + Phaser synchronizer = phaser.get(); + if (synchronizer != null) { + synchronizer.register(); + synchronizer.arriveAndAwaitAdvance(); + } data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), new AckToken(id)); + if (synchronizer != null) + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); }); // Transition to a terminal state when done state.updateAndGet(current -> { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 33cae60ab93..b4e17038a35 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -103,7 +103,7 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { // Let all async operations wait for a signal from the test thread before sending their responses, and let test // thread wait for all responses to be delivered afterwards. Phaser phaser = new Phaser(1); - session.setPhaser(phaser); + access.setPhaser(phaser); long startTime = System.currentTimeMillis(); int timeoutMillis = 1000; |