summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--document/abi-spec.json2
-rw-r--r--document/src/main/java/com/yahoo/document/DocumentUpdate.java10
-rw-r--r--document/src/main/java/com/yahoo/document/idstring/IdIdString.java17
-rw-r--r--document/src/main/java/com/yahoo/document/json/document/DocumentParser.java2
-rw-r--r--document/src/test/java/com/yahoo/document/IdIdStringTest.java5
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/AckToken.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java11
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java4
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java348
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java168
11 files changed, 449 insertions, 122 deletions
diff --git a/document/abi-spec.json b/document/abi-spec.json
index 4a94d7e55de..1b3ca23c0bc 100644
--- a/document/abi-spec.json
+++ b/document/abi-spec.json
@@ -552,6 +552,7 @@
"methods": [
"public void <init>(com.yahoo.document.DocumentType, com.yahoo.document.DocumentId)",
"public void <init>(com.yahoo.document.serialization.DocumentUpdateReader)",
+ "public void <init>(com.yahoo.document.DocumentUpdate)",
"public void <init>(com.yahoo.document.DocumentType, java.lang.String)",
"public com.yahoo.document.DocumentId getId()",
"public void setId(com.yahoo.document.DocumentId)",
@@ -3479,7 +3480,6 @@
"public"
],
"methods": [
- "public static java.lang.String replaceType(java.lang.String, java.lang.String)",
"public static long makeLocation(java.lang.String)",
"public void <init>(java.lang.String, java.lang.String, java.lang.String, java.lang.String)",
"public long getLocation()",
diff --git a/document/src/main/java/com/yahoo/document/DocumentUpdate.java b/document/src/main/java/com/yahoo/document/DocumentUpdate.java
index 8de8ca6af53..5c748f48f15 100644
--- a/document/src/main/java/com/yahoo/document/DocumentUpdate.java
+++ b/document/src/main/java/com/yahoo/document/DocumentUpdate.java
@@ -76,6 +76,16 @@ public class DocumentUpdate extends DocumentOperation implements Iterable<FieldP
reader.read(this);
}
+ /** Creates a new document update which is a copy of the argument. */
+ public DocumentUpdate(DocumentUpdate update) {
+ super(update);
+ docId = update.docId;
+ documentType = update.documentType;
+ id2FieldUpdates = new HashMap<>(update.id2FieldUpdates);
+ fieldPathUpdates = new ArrayList<>(update.fieldPathUpdates);
+ createIfNonExistent = update.createIfNonExistent;
+ }
+
/**
* Creates a DocumentUpdate.
*
diff --git a/document/src/main/java/com/yahoo/document/idstring/IdIdString.java b/document/src/main/java/com/yahoo/document/idstring/IdIdString.java
index 9c75cf6828b..bb09dff7a98 100644
--- a/document/src/main/java/com/yahoo/document/idstring/IdIdString.java
+++ b/document/src/main/java/com/yahoo/document/idstring/IdIdString.java
@@ -5,10 +5,7 @@ import com.yahoo.collections.MD5;
import com.yahoo.text.Utf8;
/**
- * Created with IntelliJ IDEA.
- * User: magnarn
- * Date: 10/15/12
- * Time: 11:02 AM
+ * @author Magnar Nedland
*/
public class IdIdString extends IdString {
private final String type;
@@ -19,20 +16,12 @@ public class IdIdString extends IdString {
private static final int SIZE_OF_ID_AND_3_COLONS = 2 + 3; // "id:::"
private static final int MAX_LENGTH = IdString.MAX_LENGTH_EXCEPT_NAMESPACE_SPECIFIC - SIZE_OF_ID_AND_3_COLONS;
- public static String replaceType(String id, String typeName) {
- int typeStartPos = id.indexOf(":", 3) + 1;
- int typeEndPos = id.indexOf(":", typeStartPos);
- return id.substring(0, typeStartPos) + typeName + id.substring(typeEndPos);
- }
-
-
public static long makeLocation(String s) {
long result = 0;
byte[] md5sum = MD5.md5.get().digest(Utf8.toBytes(s));
- for (int i=0; i<8; ++i) {
- result |= (md5sum[i] & 0xFFl) << (8*i);
+ for (int i = 0; i < 8; ++i) {
+ result |= (md5sum[i] & 0xFFL) << (8 * i);
}
-
return result;
}
diff --git a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java
index a8fdb186bd7..d6d95ca0bc6 100644
--- a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java
+++ b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java
@@ -33,7 +33,7 @@ public class DocumentParser {
/**
* Parses a single document and returns it.
- * Returns empty is we have reached the end of the stream.
+ * Returns empty if we have reached the end of the stream.
*/
public Optional<DocumentParseInfo> parse(Optional<DocumentId> documentIdArg) throws IOException {
indentLevel = 0;
diff --git a/document/src/test/java/com/yahoo/document/IdIdStringTest.java b/document/src/test/java/com/yahoo/document/IdIdStringTest.java
index 493fb35c97d..a4b05d0cf7a 100644
--- a/document/src/test/java/com/yahoo/document/IdIdStringTest.java
+++ b/document/src/test/java/com/yahoo/document/IdIdStringTest.java
@@ -109,9 +109,4 @@ public class IdIdStringTest {
}
}
- @Test
- public void requireThatIdIdStringCanReplaceType() {
- String type = IdIdString.replaceType("id:namespace:type::foo", "newType");
- assertEquals("id:namespace:newType::foo", type);
- }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java b/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java
index 963710d6c37..b24d1b33a02 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java
@@ -4,7 +4,7 @@ package com.yahoo.documentapi;
/**
* Token to use to acknowledge data for visiting.
*
- * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a>
+ * @author Thomas Gundersen
*/
public class AckToken {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java
index 0471c44cb14..cab591d2dc7 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java
@@ -12,5 +12,5 @@ public interface ResponseHandler {
*
* @param response The response to process.
*/
- public void handleResponse(Response response);
+ void handleResponse(Response response);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
index 0a4fb6fa0c8..0874ffb64fc 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java
@@ -13,6 +13,7 @@ package com.yahoo.documentapi;
* @author Håkon Humberset
*/
public interface VisitorControlSession {
+
/**
* Acknowledges a response previously retrieved by the <code>getNext</code>
* method.
@@ -20,19 +21,19 @@ public interface VisitorControlSession {
* @param token The ack token. You must get this from the visitor response
* returned by the <code>getNext</code> method.
*/
- public void ack(AckToken token);
+ void ack(AckToken token);
/**
* Aborts the session.
*/
- public void abort();
+ void abort();
/**
* Returns the next response of this session. This method returns immediately.
*
* @return the next response, or null if no response is ready at this time
*/
- public VisitorResponse getNext();
+ VisitorResponse getNext();
/**
* Returns the next response of this session. This will block until a response is ready
@@ -43,11 +44,11 @@ public interface VisitorControlSession {
* @return the next response, or null if no response becomes ready before the timeout expires
* @throws InterruptedException if this thread is interrupted while waiting
*/
- public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException;
+ VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException;
/**
* Destroys this session and frees up any resources it has held.
*/
- public void destroy();
+ void destroy();
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 982a1c50b85..675f20e3807 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -772,8 +772,8 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private void handleMessageProcessingException(Reply reply, Exception e, String what) {
- final String errorDesc = formatProcessingException(e, what);
- final String fullMsg = formatIdentifyingVisitorErrorString(errorDesc);
+ String errorDesc = formatProcessingException(e, what);
+ String fullMsg = formatIdentifyingVisitorErrorString(errorDesc);
log.log(Level.SEVERE, fullMsg, e);
int errorCode;
synchronized (progress.getToken()) {
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 1fefe2e0c7e..4b49c5cdafb 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,23 +20,27 @@ import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.fieldset.DocIdOnly;
+import com.yahoo.document.idstring.IdIdString;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
import com.yahoo.document.json.JsonWriter;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorDataHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
@@ -51,6 +55,7 @@ import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.TraceNode;
@@ -60,6 +65,7 @@ import com.yahoo.search.query.ParameterParser;
import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
+import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
import java.io.IOException;
import java.io.InputStream;
@@ -79,13 +85,16 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -145,6 +154,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String BUCKET_SPACE = "bucketSpace";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
+ private static final String DESTINATION = "destination";
private final Clock clock;
private final Metric metric;
@@ -155,9 +165,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
private final Deque<Operation> operations;
+ private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
private final AtomicLong enqueued = new AtomicLong();
+ private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
+ private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
+ private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
@@ -184,10 +197,14 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
this.operations = new ConcurrentLinkedDeque<>();
- this.executor.scheduleWithFixedDelay(this::dispatchEnqueued,
- executorConfig.resendDelayMillis(),
- executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued,
+ executorConfig.resendDelayMillis(),
+ executorConfig.resendDelayMillis(),
+ TimeUnit.MILLISECONDS);
+ this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued,
+ executorConfig.resendDelayMillis(),
+ executorConfig.resendDelayMillis(),
+ TimeUnit.MILLISECONDS);
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -236,24 +253,44 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void destroy() {
- executor.shutdown();
- Instant doom = clock.instant().plus(Duration.ofSeconds(20));
- while ( ! operations.isEmpty() && clock.instant().isBefore(doom))
+ Instant doom = clock.instant().plus(Duration.ofSeconds(30));
+
+ // This blocks until all visitors are done. These, in turn, may require the asyncSession to be alive
+ // to be able to run, as well as dispatch of operations against it, which is done by visitDispatcher.
+ visits.values().forEach(VisitorSession::destroy);
+
+ // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
+ dispatcher.shutdown();
+ visitDispatcher.shutdown();
+ while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchEnqueued();
+ dispatchVisitEnqueued();
+ }
if ( ! operations.isEmpty())
log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");
- asyncSession.destroy();
- visits.values().forEach(VisitorSession::destroy);
+ if ( ! visitOperations.isEmpty())
+ log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");
try {
- if ( ! executor.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
- executor.shutdownNow();
+ while (outstanding.get() > 0 && clock.instant().isBefore(doom))
+ Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
+
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ dispatcher.shutdownNow();
+
+ if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ visitDispatcher.shutdownNow();
}
catch (InterruptedException e) {
log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
}
+ finally {
+ asyncSession.destroy();
+ if (outstanding.get() != 0)
+ log.log(WARNING, "Failed to receive a response to " + outstanding.get() + " outstanding document operations during shutdown");
+ }
}
@FunctionalInterface
@@ -266,16 +303,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
handlers.put("/document/v1/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/{*}",
Map.of(GET, this::getDocument,
@@ -302,7 +350,55 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
enqueueAndDispatch(request, handler, () -> {
VisitorParameters parameters = parseParameters(request, path);
return () -> {
- visit(request, parameters, handler);
+ visitAndWrite(request, parameters, handler);
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ return ignoredContent;
+ }
+
+ private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ enqueueAndDispatch(request, handler, () -> {
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setRemoteDataHandler(getProperty(request, DESTINATION).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + DESTINATION + "'")));
+ return () -> {
+ visitWithRemote(request, parameters, handler);
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ return ignoredContent;
+ }
+
+ private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ if (getProperty(request, SELECTION).isEmpty())
+ throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
+
+ return new ForwardingContentChannel(in -> {
+ enqueueAndDispatch(request, handler, () -> {
+ String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates"));
+ IdIdString dummyId = new IdIdString("dummy", type, "", "");
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setFieldSet(DocIdOnly.NAME);
+ DocumentUpdate update = parser.parseUpdate(in, dummyId.toString());
+ update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection()));
+ return () -> {
+ visitAndUpdate(request, parameters, handler, update, getProperty(request, DESTINATION));
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ });
+ }
+
+ private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ if (getProperty(request, SELECTION).isEmpty())
+ throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
+
+ enqueueAndDispatch(request, handler, () -> {
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setFieldSet(DocIdOnly.NAME);
+ TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection());
+ return () -> {
+ visitAndDelete(request, parameters, handler, condition, getProperty(request, DESTINATION));
return true; // VisitorSession has its own throttle handling.
};
});
@@ -315,6 +411,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (rawParameters.fieldSet().isEmpty())
rawParameters = rawParameters.withFieldSet(path.documentType().orElseThrow() + ":[document]");
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> {
+ outstanding.decrementAndGet();
handle(path, handler, response, (document, jsonResponse) -> {
if (document != null) {
jsonResponse.writeSingleDocument(document);
@@ -336,7 +433,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentPut put = parser.parsePut(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.put(put, parameters));
});
});
@@ -350,7 +450,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.update(update, parameters));
});
});
@@ -362,7 +465,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentRemove remove = new DocumentRemove(path.id());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(remove::setCondition);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters));
});
return ignoredContent;
@@ -415,18 +521,41 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
+ /** Dispatches enqueued requests until one is blocked. */
+ void dispatchVisitEnqueued() {
+ try {
+ while (dispatchFirstVisit());
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
+ }
+ }
+
+ /** Attempts to dispatch the first enqueued visit operations, and returns whether this was successful. */
+ private boolean dispatchFirstVisit() {
+ BooleanSupplier operation = visitOperations.poll();
+ if (operation == null)
+ return false;
+
+ if (operation.getAsBoolean())
+ return true;
+
+ visitOperations.push(operation);
+ return false;
+ }
+
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
- private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Supplier<Boolean>> operationParser) {
+ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) {
if (enqueued.incrementAndGet() > maxThrottled) {
enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler);
return;
}
operations.offer(new Operation(request, handler) {
- @Override Supplier<Boolean> parse() { return operationParser.get(); }
+ @Override BooleanSupplier parse() { return operationParser.get(); }
});
dispatchFirst();
}
@@ -548,7 +677,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
- synchronized void writeDocumentValue(Document document) throws IOException {
+ synchronized void writeDocumentValue(Document document) {
new JsonWriter(json).write(document);
}
@@ -619,7 +748,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private static void loggingException(Exceptions.RunnableThrowingIOException runnable) {
+ private static void loggingException(RunnableThrowingIOException runnable) {
try {
runnable.run();
}
@@ -628,14 +757,14 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
- // ---------------------------------------------Document Operations ----------------------------------------
+ // -------------------------------------------- Document Operations ----------------------------------------
private static abstract class Operation {
private final Lock lock = new ReentrantLock();
private final HttpRequest request;
private final ResponseHandler handler;
- private Supplier<Boolean> operation;
+ private BooleanSupplier operation;
Operation(HttpRequest request, ResponseHandler handler) {
this.request = request;
@@ -644,7 +773,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/**
* Attempts to dispatch this operation to the document API, and returns whether this completed or not.
- * This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if
+ * Returns {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if
* dispatch should be retried at a later time.
*/
boolean dispatch() {
@@ -658,7 +787,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (operation == null)
operation = parse();
- return operation.get();
+ return operation.getAsBoolean();
}
catch (IllegalArgumentException e) {
badRequest(request, e, handler);
@@ -672,12 +801,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return true;
}
- abstract Supplier<Boolean> parse();
+ abstract BooleanSupplier parse();
}
- /** Attempts to send the given document operation, returning false if thes needs to be retried. */
- private static boolean dispatchOperation(Supplier<Result> documentOperation) {
+ /** Attempts to send the given document operation, returning false if this needs to be retried. */
+ private boolean dispatchOperation(Supplier<Result> documentOperation) {
Result result = documentOperation.get();
if (result.type() == Result.ResultType.TRANSIENT_ERROR)
return false;
@@ -685,6 +814,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (result.type() == Result.ResultType.FATAL_ERROR)
throw new RuntimeException(result.getError());
+ outstanding.incrementAndGet();
return true;
}
@@ -824,27 +954,108 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
path.documentType(),
List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
getProperty(request, BUCKET_SPACE)));
+
return parameters;
}
- private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ private interface VisitCallback {
+ /** Called at the start of response rendering. */
+ default void onStart(JsonResponse response) throws IOException { }
+
+ /** Called for every document received from backend visitors — must call the ack for these to proceed. */
+ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
+
+ /** Called at the end of response rendering, before generic status data is written. */
+ default void onEnd(JsonResponse response) throws IOException { }
+ }
+
+ private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ TestAndSetCondition condition, Optional<String> route) {
+ visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ DocumentRemove remove = new DocumentRemove(id);
+ remove.setCondition(condition);
+ return asyncSession.remove(remove, operationParameters);
+ });
+ }
+
+ private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ DocumentUpdate protoUpdate, Optional<String> route) {
+ visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ DocumentUpdate update = new DocumentUpdate(protoUpdate);
+ update.setId(id);
+ return asyncSession.update(update, operationParameters);
+ });
+ }
+
+ private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
+ visit(request, parameters, handler, new VisitCallback() {
+ @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters()
+ : parameters().withRoute(route.get()))
+ .withResponseHandler(operationResponse -> {
+ outstanding.decrementAndGet();
+ switch (operationResponse.outcome()) {
+ case SUCCESS:
+ case NOT_FOUND:
+ case CONDITION_FAILED:
+ break; // This is all OK — the latter two are due to mitigating races.
+ case ERROR:
+ case INSUFFICIENT_STORAGE:
+ onError.accept(operationResponse.getTextMessage());
+ break;
+ default:
+ onError.accept("Unexpected response " + operationResponse);
+ }
+ });
+ visitOperations.offer(() -> {
+ Result result = operation.apply(document.getId(), operationParameters);
+ if (result.type() == Result.ResultType.TRANSIENT_ERROR)
+ return false;
+
+ if (result.type() == Result.ResultType.FATAL_ERROR)
+ onError.accept(result.getError().getMessage());
+ else
+ outstanding.incrementAndGet();
+
+ ack.run();
+ return true;
+ });
+ dispatchFirstVisit();
+ }
+ });
+ }
+
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() {
+ @Override public void onStart(JsonResponse response) throws IOException {
+ response.writeDocumentsArrayStart();
+ }
+ @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ response.writeDocumentValue(document);
+ ack.run();
+ }
+ @Override public void onEnd(JsonResponse response) throws IOException {
+ response.writeArrayEnd();
+ }
+ });
+ }
+
+ private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() { });
+ }
+
+ private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
- response.writeDocumentsArrayStart();
- CountDownLatch latch = new CountDownLatch(1);
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) {
- loggingException(() -> {
- response.writeDocumentValue(doc);
- });
- }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
+ Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
+ AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
+ callback.onStart(response);
+ VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
- response.writeArrayEnd(); // Close "documents" array.
+ callback.onEnd(response);
switch (code) {
case TIMEOUT:
if ( ! hasVisitedAnyBuckets()) {
@@ -855,29 +1066,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
case SUCCESS: // Intentional fallthrough.
case ABORTED: // Intentional fallthrough.
- if (getProgress() != null && ! getProgress().isFinished())
- response.writeContinuation(getProgress().serializeToString());
+ if (error.get() == null) {
+ if (getProgress() != null && ! getProgress().isFinished())
+ response.writeContinuation(getProgress().serializeToString());
- response.respond(Response.Status.OK);
- break;
+ response.respond(Response.Status.OK);
+ break;
+ }
default:
- response.writeMessage(message != null ? message : "Visiting failed");
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
- executor.execute(() -> {
- try {
- latch.await(); // We may get here while dispatching thread is still putting us in the map.
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ visitDispatcher.execute(() -> {
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
});
}
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- latch.countDown();
+ };
+ if (parameters.getRemoteDataHandler() == null) {
+ parameters.setLocalDataHandler(new VisitorDataHandler() {
+ @Override public void onMessage(Message m, AckToken token) {
+ if (m instanceof PutDocumentMessage)
+ callback.onDocument(response,
+ ((PutDocumentMessage) m).getDocumentPut().getDocument(),
+ () -> ack(token),
+ errorMessage -> {
+ error.set(errorMessage);
+ controller.abort();
+ });
+ else
+ throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass());
+ }
+ });
+ }
+ parameters.setControlHandler(controller);
+ visits.put(controller, access.createVisitorSession(parameters));
+ phaser.arriveAndDeregister();
}
catch (ParseException e) {
badRequest(request, new IllegalArgumentException(e), handler);
@@ -889,6 +1114,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------ Helpers ------------------------------------------------
+ /** Returns the last property with the given name, if present, or throws if this is empty or blank. */
private static Optional<String> getProperty(HttpRequest request, String name) {
if ( ! request.parameters().containsKey(name))
return Optional.empty();
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 449daa4970a..e5e5fef5fd0 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -25,9 +25,9 @@ import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.DocumentIdResponse;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.ResponseHandler;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.SubscriptionParameters;
import com.yahoo.documentapi.SubscriptionSession;
@@ -40,6 +40,7 @@ import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.jdisc.Metric;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
@@ -59,14 +60,17 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.OptionalInt;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -172,8 +176,9 @@ public class DocumentV1ApiTest {
}
@Test
- public void testResponses() throws ExecutionException, InterruptedException {
+ public void testResponses() {
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null));
// GET at non-existent path returns 404 with available paths
var response = driver.sendRequest("http://localhost/document/v1/not-found");
assertSameJson("{" +
@@ -191,6 +196,7 @@ public class DocumentV1ApiTest {
assertEquals(404, response.getStatus());
// GET at root is a visit. Numeric parameters have an upper bound.
+ access.expect(tokens);
access.expect(parameters -> {
assertEquals("[Content:cluster=content]", parameters.getRoute().toString());
assertEquals("default", parameters.getBucketSpace());
@@ -200,9 +206,9 @@ public class DocumentV1ApiTest {
assertEquals("(all the things)", parameters.getDocumentSelection());
assertEquals(1000, parameters.getSessionTimeoutMs());
// Put some documents in the response
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0);
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0);
- ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0);
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
VisitorStatistics statistics = new VisitorStatistics();
statistics.setBucketsVisited(1);
parameters.getControlHandler().onVisitorStatistics(statistics);
@@ -246,6 +252,102 @@ public class DocumentV1ApiTest {
"}", response.readAll());
assertEquals(400, response.getStatus());
+ // POST with namespace and document type is a restricted visit with a required remote data handler ("destination")
+ access.expect(parameters -> {
+ fail("Not supposed to run");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid", POST);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"message\": \"Missing required property 'destination'\"" +
+ "}", response.readAll());
+ assertEquals(400, response.getStatus());
+
+ // POST with namespace and document type is a restricted visit with a require remote data handler ("destination")
+ access.expect(parameters -> {
+ assertEquals("zero", parameters.getRemoteDataHandler());
+ assertEquals("music:[document]", parameters.fieldSet());
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destination=zero", POST);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
+ // PUT with namespace and document type is a restricted visit with a required partial update to apply to visited documents.
+ access.expect(tokens.subList(2, 3));
+ access.expect(parameters -> {
+ assertEquals("(true) and (music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ assertEquals("[id]", parameters.fieldSet());
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "Huzzah!");
+ });
+ access.session.expect((update, parameters) -> {
+ DocumentUpdate expectedUpdate = new DocumentUpdate(doc3.getDataType(), doc3.getId());
+ expectedUpdate.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl")));
+ expectedUpdate.setCondition(new TestAndSetCondition("(true) and (music) and (id.namespace=='space')"));
+ assertEquals(expectedUpdate, update);
+ parameters.responseHandler().get().handleResponse(new UpdateResponse(0, false));
+ assertEquals(parameters().withRoute("zero"), parameters);
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=true&destination=zero", PUT,
+ "{" +
+ " \"fields\": {" +
+ " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" +
+ " }" +
+ "}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
+ // PUT with namespace, document type and group is also a restricted visit which requires a selection.
+ access.expect(parameters -> {
+ fail("Not supposed to run");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/group/troupe", PUT);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/group/troupe\"," +
+ " \"message\": \"Missing required property 'selection'\"" +
+ "}", response.readAll());
+ assertEquals(400, response.getStatus());
+
+ // DELETE with namespace and document type is a restricted visit which deletes visited documents.
+ access.expect(tokens.subList(0, 1));
+ access.expect(parameters -> {
+ assertEquals("(false) and (music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ assertEquals("[id]", parameters.fieldSet());
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(0));
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Huzzah?");
+ });
+ access.session.expect((remove, parameters) -> {
+ DocumentRemove expectedRemove = new DocumentRemove(doc2.getId());
+ expectedRemove.setCondition(new TestAndSetCondition("(false) and (music) and (id.namespace=='space')"));
+ assertEquals(new DocumentRemove(doc2.getId()), remove);
+ assertEquals(parameters().withRoute("zero"), parameters);
+ parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId(), "boom", Response.Outcome.ERROR));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=false&destination=zero", DELETE);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"message\": \"boom\"" +
+ "}", response.readAll());
+ assertEquals(500, response.getStatus());
+
+ // DELETE at the root is also a deletion visit. These require a selection.
+ access.expect(parameters -> {
+ fail("Not supposed to run");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid", DELETE);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"message\": \"Missing required property 'selection'\"" +
+ "}", response.readAll());
+ assertEquals(400, response.getStatus());
+
// GET with namespace, document type and group is a restricted visit.
access.expect(parameters -> {
assertEquals("(music) and (id.namespace=='space') and (id.group=='best\\'')", parameters.getDocumentSelection());
@@ -419,7 +521,7 @@ public class DocumentV1ApiTest {
DocumentRemove expectedRemove = new DocumentRemove(doc2.getId());
expectedRemove.setCondition(new TestAndSetCondition("false"));
assertEquals(new DocumentRemove(doc2.getId()), remove);
- assertEquals(parameters.withRoute("route"), parameters);
+ assertEquals(parameters().withRoute("route"), parameters);
parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId()));
return new Result(Result.ResultType.SUCCESS, null);
});
@@ -513,27 +615,20 @@ public class DocumentV1ApiTest {
"}", response2.readAll());
assertEquals(500, response2.getStatus());
- // Request timeout is dispatched after timeout has passed.
- CountDownLatch latch = new CountDownLatch(1);
- var assertions = Executors.newSingleThreadExecutor().submit(() -> {
- access.session.expect((id, parameters) -> {
- try {
- latch.await();
- }
- catch (InterruptedException e) {
- fail("Not supposed to be interrupted");
- }
- return new Result(Result.ResultType.SUCCESS, null);
- });
- var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=content&fieldSet=go&timeout=1ms");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/docid/one\"," +
- " \"message\": \"Request timeout after 1ms\"" +
- "}", response4.readAll());
- assertEquals(504, response4.getStatus());
+ // Request response does not arrive before timeout has passed.
+ AtomicReference<ResponseHandler> handler = new AtomicReference<>();
+ access.session.expect((id, parameters) -> {
+ handler.set(parameters.responseHandler().get());
+ return new Result(Result.ResultType.SUCCESS, null);
});
- latch.countDown();
- assertions.get();
+ var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid/one\"," +
+ " \"message\": \"Request timeout after 1ms\"" +
+ "}", response4.readAll());
+ assertEquals(504, response4.getStatus());
+ if (handler.get() != null) // Timeout may have occurred before dispatch, or ...
+ handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late.
driver.close();
}
@@ -542,6 +637,7 @@ public class DocumentV1ApiTest {
static class MockDocumentAccess extends DocumentAccess {
private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
+ private final Set<AckToken> outstanding = new CopyOnWriteArraySet<>();
private final MockAsyncSession session = new MockAsyncSession();
MockDocumentAccess(DocumentmanagerConfig config) {
@@ -560,18 +656,24 @@ public class DocumentV1ApiTest {
@Override
public VisitorSession createVisitorSession(VisitorParameters parameters) {
- expectations.get().accept(parameters);
- return new VisitorSession() {
+ VisitorSession visitorSession = new VisitorSession() {
+ {
+ parameters.getControlHandler().setSession(this);
+ if (parameters.getLocalDataHandler() != null)
+ parameters.getLocalDataHandler().setSession(this);
+ }
@Override public boolean isDone() { return false; }
@Override public ProgressToken getProgress() { return null; }
@Override public Trace getTrace() { return null; }
@Override public boolean waitUntilDone(long timeoutMs) { return false; }
- @Override public void ack(AckToken token) { }
+ @Override public void ack(AckToken token) { assertTrue(outstanding.remove(token)); }
@Override public void abort() { }
@Override public VisitorResponse getNext() { return null; }
@Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; }
- @Override public void destroy() { }
+ @Override public void destroy() { assertEquals(Set.of(), outstanding); }
};
+ expectations.get().accept(parameters);
+ return visitorSession;
}
@Override
@@ -593,6 +695,10 @@ public class DocumentV1ApiTest {
this.expectations.set(expectations);
}
+ public void expect(Collection<AckToken> tokens) {
+ outstanding.addAll(tokens);
+ }
+
}