From fe92f94becab1750853cac77464bb710374e56dd Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Wed, 20 Oct 2021 14:12:22 +0200 Subject: Support HTTP streaming of visits through /document/v1 --- .../restapi/resource/DocumentV1ApiHandler.java | 96 +++++++++++++--------- 1 file changed, 56 insertions(+), 40 deletions(-) (limited to 'vespaclient-container-plugin/src/main/java') diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index aa96b0932c3..f1d6b4825c6 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -158,6 +158,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String TIME_CHUNK = "timeChunk"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; + private static final String STREAMING = "streaming"; private final Clock clock; private final Duration handlerTimeout; @@ -356,8 +357,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseGetParameters(request, path); + boolean streaming = getProperty(request, STREAMING, booleanParser).orElse(false); return () -> { - visitAndWrite(request, parameters, handler); + visitAndWrite(request, parameters, handler, streaming); return true; // VisitorSession has its own throttle handling. }; }); @@ -573,19 +575,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static class JsonResponse implements AutoCloseable { private final BufferedContentChannel buffer = new BufferedContentChannel(); - private final OutputStream out = new ContentChannelOutputStream(buffer); - private final JsonGenerator json = jsonFactory.createGenerator(out); + private final OutputStream out; + private final JsonGenerator json; private final ResponseHandler handler; private ContentChannel channel; - private JsonResponse(ResponseHandler handler) throws IOException { + private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException { this.handler = handler; + out = new ContentChannelOutputStream(buffer); + json = jsonFactory.createGenerator(out); json.writeStartObject(); } /** Creates a new JsonResponse with path and id fields written. */ static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); + JsonResponse response = new JsonResponse(handler, false); response.writePathId(path.rawPath()); response.writeDocId(path.id()); return response; @@ -593,15 +597,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** Creates a new JsonResponse with path field written. */ static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); + return create(request, handler, false); + } + + /** Creates a new JsonResponse with path field written. */ + static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException { + JsonResponse response = new JsonResponse(handler, streaming); response.writePathId(request.getUri().getRawPath()); return response; } /** Creates a new JsonResponse with path and message fields written. */ static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException { - JsonResponse response = new JsonResponse(handler); - response.writePathId(request.getUri().getRawPath()); + JsonResponse response = create(request, handler); response.writeMessage(message); return response; } @@ -1042,7 +1050,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, String route, BiFunction operation) { - visit(request, parameters, handler, new VisitCallback() { + visit(request, parameters, false, handler, new VisitCallback() { @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { DocumentOperationParameters operationParameters = parameters().withRoute(route) .withResponseHandler(operationResponse -> { @@ -1079,9 +1087,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, handler, new VisitCallback() { + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { + visit(request, parameters, streaming, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { + if (streaming) + response.commit(Response.Status.OK); + response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { @@ -1095,10 +1106,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { - visit(request, parameters, handler, new VisitCallback() { }); + visit(request, parameters, false, handler, new VisitCallback() { }); } - private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { + private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. @@ -1108,34 +1119,39 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { - callback.onEnd(response); - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { - response.writeMessage("No buckets visited within timeout of " + - parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); - response.respond(Response.Status.GATEWAY_TIMEOUT); - break; - } - case SUCCESS: // Intentional fallthrough. - case ABORTED: // Intentional fallthrough. - if (error.get() == null) { - ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); - if (progress != null && ! progress.isFinished()) - response.writeContinuation(progress.serializeToString()); - - if (getVisitorStatistics() != null) - response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - - response.respond(Response.Status.OK); - break; - } - default: - response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); - if (getVisitorStatistics() != null) - response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); + try (response) { + callback.onEnd(response); + + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - response.respond(Response.Status.BAD_GATEWAY); + int status = Response.Status.BAD_GATEWAY; + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { + response.writeMessage("No buckets visited within timeout of " + + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); + status = Response.Status.GATEWAY_TIMEOUT; + break; + } + // TODO jonmv: [test] limit pending, + // TODO jonmv: [test] abort on shutdown, + // TODO jonmv: always supply and document continuation? + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (error.get() == null) { + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + + status = Response.Status.OK; + break; + } + default: + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); + } + if ( ! streaming) + response.commit(status); } }); visitDispatcher.execute(() -> { -- cgit v1.2.3