aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-29 11:30:15 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:35 +0200
commit264cd85f6aa90d08438d479b59598b833485ac0a (patch)
tree90a58075ef3dd12f1378781d2a16ae514c4f199c
parent11405e52f2853e44df0944bf7bbee13dc2e617a5 (diff)
Move setPhaser to LocalDocumentAccess
-rw-r--r--documentapi/abi-spec.json2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java20
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java41
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java11
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java2
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java20
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java12
7 files changed, 82 insertions, 26 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index c8cbc978a8f..e531e886dd9 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -980,7 +980,6 @@
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
"public void destroy()",
- "public void setPhaser(java.util.concurrent.Phaser)",
"public void setResultType(com.yahoo.documentapi.Result$ResultType)"
],
"fields": []
@@ -999,6 +998,7 @@
"public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)",
"public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)",
"public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)",
+ "public void setPhaser(java.util.concurrent.Phaser)",
"public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
"public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
"public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)"
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index ff3eeb02a71..895c3b61cfe 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -43,14 +43,15 @@ public class LocalAsyncSession implements AsyncSession {
private final ResponseHandler handler;
private final SyncSession syncSession;
private final Executor executor = Executors.newCachedThreadPool();
+ private final AtomicReference<Phaser> phaser;
private AtomicLong requestId = new AtomicLong(0);
- private AtomicReference<Phaser> phaser = new AtomicReference<>();
private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.phaser = access.phaser;
}
@Override
@@ -148,18 +149,7 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
- /**
- * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
- * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
- * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
- *
- * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
- */
- public void setPhaser(Phaser phaser) {
- this.phaser.set(phaser);
- }
-
- /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */
public void setResultType(Result.ResultType resultType) {
this.result.set(resultType);
}
@@ -186,7 +176,7 @@ public class LocalAsyncSession implements AsyncSession {
synchronizer.register();
synchronizer.arriveAndAwaitAdvance();
addResponse(responses.apply(req));
- synchronizer.arriveAndDeregister();
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
});
return new Result(req);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index e24853b9294..f132b8f2ea9 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.SubscriptionParameters;
import com.yahoo.documentapi.SubscriptionSession;
import com.yahoo.documentapi.SyncParameters;
-import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorParameters;
@@ -19,6 +18,8 @@ import com.yahoo.documentapi.VisitorSession;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
/**
* The main class of the local implementation of the document api
@@ -27,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class LocalDocumentAccess extends DocumentAccess {
- Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final AtomicReference<Phaser> phaser = new AtomicReference<>();
public LocalDocumentAccess(DocumentAccessParams params) {
super(params);
@@ -63,4 +65,39 @@ public class LocalDocumentAccess extends DocumentAccess {
throw new UnsupportedOperationException("Not supported yet");
}
+ /**
+ * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this.
+ *
+ * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default,
+ * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession},
+ * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread.
+ * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a
+ * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the
+ * document operations — which are then also done by a dedicated thread pool, instead of the caller thread.
+ *
+ * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the
+ * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses.
+ * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user
+ * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage:
+ *
+ * <pre> {@code
+ * void testOperations(LocalDocumentAccess access) {
+ * List<Response> responses = new ArrayList<>();
+ * Phaser phaser = new Phaser(1); // "1" to register self
+ * access.setPhaser(phaser);
+ * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add));
+ * session.put(documentPut);
+ * session.get(documentId);
+ * // Operations wait for this thread to arrive at "phaser"
+ * phaser.arrive(); // Let operations send their responses
+ * // "responses" may or may not hold the responses now
+ * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc.
+ * // "responses" now has responses from all previous operations
+ * phaser.arriveAndDeregister(); // Deregister so further operations flow freely
+ * }}</pre>
+ */
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 85be1c11fcd..0afc9b58be2 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession {
private final DocumentSelector selector;
private final FieldSet fieldSet;
private final AtomicReference<State> state;
+ private final AtomicReference<Phaser> phaser;
public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
if (parameters.getResumeToken() != null)
@@ -64,6 +66,7 @@ public class LocalVisitorSession implements VisitorSession {
this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString));
this.outstanding.putAll(access.documents);
this.state = new AtomicReference<>(State.RUNNING);
+ this.phaser = access.phaser;
start();
}
@@ -87,8 +90,16 @@ public class LocalVisitorSession implements VisitorSession {
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
+
+ Phaser synchronizer = phaser.get();
+ if (synchronizer != null) {
+ synchronizer.register();
+ synchronizer.arriveAndAwaitAdvance();
+ }
data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
new AckToken(id));
+ if (synchronizer != null)
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
});
// Transition to a terminal state when done
state.updateAndGet(current -> {
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 33cae60ab93..b4e17038a35 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -103,7 +103,7 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
// Let all async operations wait for a signal from the test thread before sending their responses, and let test
// thread wait for all responses to be delivered afterwards.
Phaser phaser = new Phaser(1);
- session.setPhaser(phaser);
+ access.setPhaser(phaser);
long startTime = System.currentTimeMillis();
int timeoutMillis = 1000;
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
index 4a87217e08f..35d29fd9e44 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
@@ -7,19 +7,33 @@ import com.google.inject.binder.AnnotatedBindingBuilder;
import com.yahoo.jdisc.Container;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.*;
+import com.yahoo.jdisc.application.Application;
+import com.yahoo.jdisc.application.ContainerActivator;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.application.DeactivatedContainer;
+import com.yahoo.jdisc.application.OsgiFramework;
import com.yahoo.jdisc.core.ApplicationLoader;
import com.yahoo.jdisc.core.BootstrapLoader;
import com.yahoo.jdisc.core.FelixFramework;
import com.yahoo.jdisc.core.FelixParams;
-import com.yahoo.jdisc.handler.*;
+import com.yahoo.jdisc.handler.BindingNotFoundException;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.RequestDispatch;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.CurrentContainer;
import java.net.URI;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
index b675af3b564..fb9a6eb5873 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -39,10 +39,12 @@ import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
@@ -110,16 +112,18 @@ public class DocumentOperationExecutor {
/** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
public void shutdown() {
long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
+ visits.values().forEach(VisitorSession::destroy);
Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
context -> context.error(TIMEOUT, "Timed out due to shutdown"));
- visits.values().forEach(VisitorSession::destroy);
try {
throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
}
- catch (Exception e) {
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throttleShutdown.cancel(true);
+ throttleShutdown.cancel(true);
log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
}
}
@@ -646,9 +650,9 @@ public class DocumentOperationExecutor {
}
- private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
if (clusters.isEmpty())
- throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled.");
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
return wanted.map(cluster -> {
if ( ! clusters.containsKey(cluster))