summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java85
1 files changed, 64 insertions, 21 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 7330d21d830..48e5f37a4f8 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,23 +20,26 @@ import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.idstring.IdIdString;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
import com.yahoo.document.json.JsonWriter;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorDataHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
@@ -51,6 +54,7 @@ import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.TraceNode;
@@ -85,6 +89,7 @@ import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
@@ -304,7 +309,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
enqueueAndDispatch(request, handler, () -> {
VisitorParameters parameters = parseParameters(request, path);
return () -> {
- visit(request, parameters, handler);
+ visitAndWrite(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -830,24 +835,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return parameters;
}
- private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ private interface VisitCallback {
+ /** Called at the start of response rendering. */
+ default void onStart(JsonResponse response) throws IOException { }
+
+ /** Called for every document received from backend visitors — must call the ack for these to proceed. */
+ default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { }
+
+ /** Called at the end of response rendering, before generic status data is written. */
+ default void onEnd(JsonResponse response) throws IOException { }
+ }
+
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() {
+ @Override public void onStart(JsonResponse response) throws IOException {
+ response.writeDocumentsArrayStart();
+ }
+ @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ response.writeDocumentValue(document);
+ ack.run();
+ }
+ @Override public void onEnd(JsonResponse response) throws IOException {
+ response.writeArrayEnd();
+ }
+ });
+ }
+
+ private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
- response.writeDocumentsArrayStart();
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) {
- loggingException(() -> {
- response.writeDocumentValue(doc);
- });
- }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
+ AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
+ callback.onStart(response);
+ VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
- response.writeArrayEnd(); // Close "documents" array.
+ callback.onEnd(response);
switch (code) {
case TIMEOUT:
if ( ! hasVisitedAnyBuckets()) {
@@ -858,13 +882,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
case SUCCESS: // Intentional fallthrough.
case ABORTED: // Intentional fallthrough.
- if (getProgress() != null && ! getProgress().isFinished())
- response.writeContinuation(getProgress().serializeToString());
+ if (error.get() == null) {
+ if (getProgress() != null && ! getProgress().isFinished())
+ response.writeContinuation(getProgress().serializeToString());
- response.respond(Response.Status.OK);
- break;
+ response.respond(Response.Status.OK);
+ break;
+ }
default:
- response.writeMessage(message != null ? message : "Visiting failed");
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
dispatcher.execute(() -> {
@@ -873,8 +899,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
});
}
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ };
+ if (parameters.getRemoteDataHandler() == null) {
+ parameters.setLocalDataHandler(new VisitorDataHandler() {
+ @Override public void onMessage(Message m, AckToken token) {
+ if (m instanceof PutDocumentMessage)
+ callback.onDocument(response,
+ ((PutDocumentMessage) m).getDocumentPut().getDocument(),
+ () -> ack(token),
+ errorMessage -> {
+ error.set(errorMessage);
+ controller.abort();
+ });
+ else
+ throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass());
+ }
+ });
+ }
+ parameters.setControlHandler(controller);
+ visits.put(controller, access.createVisitorSession(parameters));
phaser.arriveAndDeregister();
}
catch (ParseException e) {