summaryrefslogtreecommitdiffstats
path: root/documentapi/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-29 11:30:15 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:35 +0200
commit264cd85f6aa90d08438d479b59598b833485ac0a (patch)
tree90a58075ef3dd12f1378781d2a16ae514c4f199c /documentapi/src
parent11405e52f2853e44df0944bf7bbee13dc2e617a5 (diff)
Move setPhaser to LocalDocumentAccess
Diffstat (limited to 'documentapi/src')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java20
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java41
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java11
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java2
4 files changed, 56 insertions, 18 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index ff3eeb02a71..895c3b61cfe 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -43,14 +43,15 @@ public class LocalAsyncSession implements AsyncSession {
private final ResponseHandler handler;
private final SyncSession syncSession;
private final Executor executor = Executors.newCachedThreadPool();
+ private final AtomicReference<Phaser> phaser;
private AtomicLong requestId = new AtomicLong(0);
- private AtomicReference<Phaser> phaser = new AtomicReference<>();
private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.phaser = access.phaser;
}
@Override
@@ -148,18 +149,7 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
- /**
- * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
- * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
- * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
- *
- * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
- */
- public void setPhaser(Phaser phaser) {
- this.phaser.set(phaser);
- }
-
- /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */
public void setResultType(Result.ResultType resultType) {
this.result.set(resultType);
}
@@ -186,7 +176,7 @@ public class LocalAsyncSession implements AsyncSession {
synchronizer.register();
synchronizer.arriveAndAwaitAdvance();
addResponse(responses.apply(req));
- synchronizer.arriveAndDeregister();
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
});
return new Result(req);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index e24853b9294..f132b8f2ea9 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.SubscriptionParameters;
import com.yahoo.documentapi.SubscriptionSession;
import com.yahoo.documentapi.SyncParameters;
-import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorParameters;
@@ -19,6 +18,8 @@ import com.yahoo.documentapi.VisitorSession;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
/**
* The main class of the local implementation of the document api
@@ -27,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class LocalDocumentAccess extends DocumentAccess {
- Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final AtomicReference<Phaser> phaser = new AtomicReference<>();
public LocalDocumentAccess(DocumentAccessParams params) {
super(params);
@@ -63,4 +65,39 @@ public class LocalDocumentAccess extends DocumentAccess {
throw new UnsupportedOperationException("Not supported yet");
}
+ /**
+ * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this.
+ *
+ * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default,
+ * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession},
+ * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread.
+ * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a
+ * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the
+ * document operations — which are then also done by a dedicated thread pool, instead of the caller thread.
+ *
+ * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the
+ * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses.
+ * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user
+ * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage:
+ *
+ * <pre> {@code
+ * void testOperations(LocalDocumentAccess access) {
+ * List<Response> responses = new ArrayList<>();
+ * Phaser phaser = new Phaser(1); // "1" to register self
+ * access.setPhaser(phaser);
+ * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add));
+ * session.put(documentPut);
+ * session.get(documentId);
+ * // Operations wait for this thread to arrive at "phaser"
+ * phaser.arrive(); // Let operations send their responses
+ * // "responses" may or may not hold the responses now
+ * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc.
+ * // "responses" now has responses from all previous operations
+ * phaser.arriveAndDeregister(); // Deregister so further operations flow freely
+ * }}</pre>
+ */
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 85be1c11fcd..0afc9b58be2 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession {
private final DocumentSelector selector;
private final FieldSet fieldSet;
private final AtomicReference<State> state;
+ private final AtomicReference<Phaser> phaser;
public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
if (parameters.getResumeToken() != null)
@@ -64,6 +66,7 @@ public class LocalVisitorSession implements VisitorSession {
this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString));
this.outstanding.putAll(access.documents);
this.state = new AtomicReference<>(State.RUNNING);
+ this.phaser = access.phaser;
start();
}
@@ -87,8 +90,16 @@ public class LocalVisitorSession implements VisitorSession {
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
+
+ Phaser synchronizer = phaser.get();
+ if (synchronizer != null) {
+ synchronizer.register();
+ synchronizer.arriveAndAwaitAdvance();
+ }
data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
new AckToken(id));
+ if (synchronizer != null)
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
});
// Transition to a terminal state when done
state.updateAndGet(current -> {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 33cae60ab93..b4e17038a35 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -103,7 +103,7 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
// Let all async operations wait for a signal from the test thread before sending their responses, and let test
// thread wait for all responses to be delivered afterwards.
Phaser phaser = new Phaser(1);
- session.setPhaser(phaser);
+ access.setPhaser(phaser);
long startTime = System.currentTimeMillis();
int timeoutMillis = 1000;