diff options
7 files changed, 82 insertions, 26 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index c8cbc978a8f..e531e886dd9 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -980,7 +980,6 @@ "public com.yahoo.documentapi.Response getNext()", "public com.yahoo.documentapi.Response getNext(int)", "public void destroy()", - "public void setPhaser(java.util.concurrent.Phaser)", "public void setResultType(com.yahoo.documentapi.Result$ResultType)" ], "fields": [] @@ -999,6 +998,7 @@ "public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)", "public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)", "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)", + "public void setPhaser(java.util.concurrent.Phaser)", "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)" diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index ff3eeb02a71..895c3b61cfe 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -43,14 +43,15 @@ public class LocalAsyncSession implements AsyncSession { private final ResponseHandler handler; private final SyncSession syncSession; private final Executor executor = Executors.newCachedThreadPool(); + private final AtomicReference<Phaser> phaser; private AtomicLong requestId = new AtomicLong(0); - private AtomicReference<Phaser> phaser = new AtomicReference<>(); private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); - syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.phaser = access.phaser; } @Override @@ -148,18 +149,7 @@ public class LocalAsyncSession implements AsyncSession { // empty } - /** - * When this is set, every operation is sent in a separate thread, which first registers with the given phaser, - * and then arrives and awaits advance so the user can trigger responses. After the response is delivered, - * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered. - * - * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. - */ - public void setPhaser(Phaser phaser) { - this.phaser.set(phaser); - } - - /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ + /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */ public void setResultType(Result.ResultType resultType) { this.result.set(resultType); } @@ -186,7 +176,7 @@ public class LocalAsyncSession implements AsyncSession { synchronizer.register(); synchronizer.arriveAndAwaitAdvance(); addResponse(responses.apply(req)); - synchronizer.arriveAndDeregister(); + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); }); return new Result(req); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index e24853b9294..f132b8f2ea9 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.SubscriptionParameters; import com.yahoo.documentapi.SubscriptionSession; import com.yahoo.documentapi.SyncParameters; -import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.VisitorDestinationParameters; import com.yahoo.documentapi.VisitorDestinationSession; import com.yahoo.documentapi.VisitorParameters; @@ -19,6 +18,8 @@ import com.yahoo.documentapi.VisitorSession; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; /** * The main class of the local implementation of the document api @@ -27,7 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class LocalDocumentAccess extends DocumentAccess { - Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final AtomicReference<Phaser> phaser = new AtomicReference<>(); public LocalDocumentAccess(DocumentAccessParams params) { super(params); @@ -63,4 +65,39 @@ public class LocalDocumentAccess extends DocumentAccess { throw new UnsupportedOperationException("Not supported yet"); } + /** + * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this. + * + * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default, + * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession}, + * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread. + * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a + * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the + * document operations — which are then also done by a dedicated thread pool, instead of the caller thread. + * + * When this is set, the thread that sends a document (visit) or response (async-session) first registers with the + * given phaser, and then arrives and awaits advance so the user can trigger these documents and responses. + * After the document or response is delivered, the thread arrives, deregisters and awaits advance, so the user + * can wait until the document or response has been delivered. This also ensures memory visibility. Example usage: + * + * <pre> {@code + * void testOperations(LocalDocumentAccess access) { + * List<Response> responses = new ArrayList<>(); + * Phaser phaser = new Phaser(1); // "1" to register self + * access.setPhaser(phaser); + * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add)); + * session.put(documentPut); + * session.get(documentId); + * // Operations wait for this thread to arrive at "phaser" + * phaser.arrive(); // Let operations send their responses + * // "responses" may or may not hold the responses now + * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc. + * // "responses" now has responses from all previous operations + * phaser.arriveAndDeregister(); // Deregister so further operations flow freely + * }}</pre> + */ + public void setPhaser(Phaser phaser) { + this.phaser.set(phaser); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 85be1c11fcd..0afc9b58be2 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions; import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; /** @@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession { private final DocumentSelector selector; private final FieldSet fieldSet; private final AtomicReference<State> state; + private final AtomicReference<Phaser> phaser; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { if (parameters.getResumeToken() != null) @@ -64,6 +66,7 @@ public class LocalVisitorSession implements VisitorSession { this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString)); this.outstanding.putAll(access.documents); this.state = new AtomicReference<>(State.RUNNING); + this.phaser = access.phaser; start(); } @@ -87,8 +90,16 @@ public class LocalVisitorSession implements VisitorSession { Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); + + Phaser synchronizer = phaser.get(); + if (synchronizer != null) { + synchronizer.register(); + synchronizer.arriveAndAwaitAdvance(); + } data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), new AckToken(id)); + if (synchronizer != null) + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); }); // Transition to a terminal state when done state.updateAndGet(current -> { diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 33cae60ab93..b4e17038a35 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -103,7 +103,7 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { // Let all async operations wait for a signal from the test thread before sending their responses, and let test // thread wait for all responses to be delivered afterwards. Phaser phaser = new Phaser(1); - session.setPhaser(phaser); + access.setPhaser(phaser); long startTime = System.currentTimeMillis(); int timeoutMillis = 1000; diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java index 4a87217e08f..35d29fd9e44 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java @@ -7,19 +7,33 @@ import com.google.inject.binder.AnnotatedBindingBuilder; import com.yahoo.jdisc.Container; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.application.*; +import com.yahoo.jdisc.application.Application; +import com.yahoo.jdisc.application.ContainerActivator; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.application.DeactivatedContainer; +import com.yahoo.jdisc.application.OsgiFramework; import com.yahoo.jdisc.core.ApplicationLoader; import com.yahoo.jdisc.core.BootstrapLoader; import com.yahoo.jdisc.core.FelixFramework; import com.yahoo.jdisc.core.FelixParams; -import com.yahoo.jdisc.handler.*; +import com.yahoo.jdisc.handler.BindingNotFoundException; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.RequestDispatch; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.service.CurrentContainer; import java.net.URI; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java index b675af3b564..fb9a6eb5873 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java @@ -39,10 +39,12 @@ import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -110,16 +112,18 @@ public class DocumentOperationExecutor { /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */ public void shutdown() { long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli(); + visits.values().forEach(VisitorSession::destroy); Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30), context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown")); Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40), context -> context.error(TIMEOUT, "Timed out due to shutdown")); - visits.values().forEach(VisitorSession::destroy); try { throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); } - catch (Exception e) { + catch (InterruptedException | ExecutionException | TimeoutException e) { + throttleShutdown.cancel(true); + throttleShutdown.cancel(true); log.log(WARNING, "Exception shutting down " + getClass().getName(), e); } } @@ -646,9 +650,9 @@ public class DocumentOperationExecutor { } - private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { + static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { if (clusters.isEmpty()) - throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled."); + throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); return wanted.map(cluster -> { if ( ! clusters.containsKey(cluster)) |