diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-02-02 14:05:28 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-02-03 09:54:15 +0100 |
commit | ad290b1171641a1c2a6ea8f16c3995deefb77ed6 (patch) | |
tree | 3498c785b78d5838c264475ffd3686238f23ea16 /vespaclient-container-plugin/src/main/java/com | |
parent | 813afe71808808ef983003be90c6e8bcc5c2ac0a (diff) |
Refactor to allow more flex in reacting to documents received
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 85 |
1 files changed, 64 insertions, 21 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 7330d21d830..48e5f37a4f8 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,23 +20,26 @@ import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AckToken; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorDataHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; @@ -51,6 +54,7 @@ import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.Message; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.TraceNode; @@ -85,6 +89,7 @@ import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BooleanSupplier; @@ -304,7 +309,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseParameters(request, path); return () -> { - visit(request, parameters, handler); + visitAndWrite(request, parameters, handler); return true; // VisitorSession has its own throttle handling. }; }); @@ -830,24 +835,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return parameters; } - private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + private interface VisitCallback { + /** Called at the start of response rendering. */ + default void onStart(JsonResponse response) throws IOException { } + + /** Called for every document received from backend visitors — must call the ack for these to proceed. */ + default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } + + /** Called at the end of response rendering, before generic status data is written. */ + default void onEnd(JsonResponse response) throws IOException { } + } + + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onStart(JsonResponse response) throws IOException { + response.writeDocumentsArrayStart(); + } + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + response.writeDocumentValue(document); + ack.run(); + } + @Override public void onEnd(JsonResponse response) throws IOException { + response.writeArrayEnd(); + } + }); + } + + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); - response.writeDocumentsArrayStart(); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. - parameters.setLocalDataHandler(new DumpVisitorDataHandler() { - @Override public void onDocument(Document doc, long timeStamp) { - loggingException(() -> { - response.writeDocumentValue(doc); - }); - } - @Override public void onRemove(DocumentId id) { } // We don't visit removes. - }); - parameters.setControlHandler(new VisitorControlHandler() { + AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. + callback.onStart(response); + VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { - response.writeArrayEnd(); // Close "documents" array. + callback.onEnd(response); switch (code) { case TIMEOUT: if ( ! hasVisitedAnyBuckets()) { @@ -858,13 +882,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } case SUCCESS: // Intentional fallthrough. case ABORTED: // Intentional fallthrough. - if (getProgress() != null && ! getProgress().isFinished()) - response.writeContinuation(getProgress().serializeToString()); + if (error.get() == null) { + if (getProgress() != null && ! getProgress().isFinished()) + response.writeContinuation(getProgress().serializeToString()); - response.respond(Response.Status.OK); - break; + response.respond(Response.Status.OK); + break; + } default: - response.writeMessage(message != null ? message : "Visiting failed"); + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } dispatcher.execute(() -> { @@ -873,8 +899,25 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); }); } - }); - visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + }; + if (parameters.getRemoteDataHandler() == null) { + parameters.setLocalDataHandler(new VisitorDataHandler() { + @Override public void onMessage(Message m, AckToken token) { + if (m instanceof PutDocumentMessage) + callback.onDocument(response, + ((PutDocumentMessage) m).getDocumentPut().getDocument(), + () -> ack(token), + errorMessage -> { + error.set(errorMessage); + controller.abort(); + }); + else + throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass()); + } + }); + } + parameters.setControlHandler(controller); + visits.put(controller, access.createVisitorSession(parameters)); phaser.arriveAndDeregister(); } catch (ParseException e) { |