aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-20 14:12:22 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-20 14:12:22 +0200
commitfe92f94becab1750853cac77464bb710374e56dd (patch)
tree059b774653121156c298b34a98d2221bbcdc399c /vespaclient-container-plugin
parent7997eb9a99426ce0093fe461d0a5658809691081 (diff)
Support HTTP streaming of visits through /document/v1
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java96
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java70
2 files changed, 124 insertions, 42 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index aa96b0932c3..f1d6b4825c6 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -158,6 +158,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String TIME_CHUNK = "timeChunk";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
+ private static final String STREAMING = "streaming";
private final Clock clock;
private final Duration handlerTimeout;
@@ -356,8 +357,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
VisitorParameters parameters = parseGetParameters(request, path);
+ boolean streaming = getProperty(request, STREAMING, booleanParser).orElse(false);
return () -> {
- visitAndWrite(request, parameters, handler);
+ visitAndWrite(request, parameters, handler, streaming);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -573,19 +575,21 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static class JsonResponse implements AutoCloseable {
private final BufferedContentChannel buffer = new BufferedContentChannel();
- private final OutputStream out = new ContentChannelOutputStream(buffer);
- private final JsonGenerator json = jsonFactory.createGenerator(out);
+ private final OutputStream out;
+ private final JsonGenerator json;
private final ResponseHandler handler;
private ContentChannel channel;
- private JsonResponse(ResponseHandler handler) throws IOException {
+ private JsonResponse(ResponseHandler handler, boolean streaming) throws IOException {
this.handler = handler;
+ out = new ContentChannelOutputStream(buffer);
+ json = jsonFactory.createGenerator(out);
json.writeStartObject();
}
/** Creates a new JsonResponse with path and id fields written. */
static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
+ JsonResponse response = new JsonResponse(handler, false);
response.writePathId(path.rawPath());
response.writeDocId(path.id());
return response;
@@ -593,15 +597,19 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
/** Creates a new JsonResponse with path field written. */
static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
+ return create(request, handler, false);
+ }
+
+ /** Creates a new JsonResponse with path field written. */
+ static JsonResponse create(HttpRequest request, ResponseHandler handler, boolean streaming) throws IOException {
+ JsonResponse response = new JsonResponse(handler, streaming);
response.writePathId(request.getUri().getRawPath());
return response;
}
/** Creates a new JsonResponse with path and message fields written. */
static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException {
- JsonResponse response = new JsonResponse(handler);
- response.writePathId(request.getUri().getRawPath());
+ JsonResponse response = create(request, handler);
response.writeMessage(message);
return response;
}
@@ -1042,7 +1050,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
String route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
- visit(request, parameters, handler, new VisitCallback() {
+ visit(request, parameters, false, handler, new VisitCallback() {
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
DocumentOperationParameters operationParameters = parameters().withRoute(route)
.withResponseHandler(operationResponse -> {
@@ -1079,9 +1087,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, handler, new VisitCallback() {
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
+ visit(request, parameters, streaming, handler, new VisitCallback() {
@Override public void onStart(JsonResponse response) throws IOException {
+ if (streaming)
+ response.commit(Response.Status.OK);
+
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
@@ -1095,10 +1106,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
- visit(request, parameters, handler, new VisitCallback() { });
+ visit(request, parameters, false, handler, new VisitCallback() { });
}
- private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
+ private void visit(HttpRequest request, VisitorParameters parameters, boolean streaming, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
@@ -1108,34 +1119,39 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
- callback.onEnd(response);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
- response.writeMessage("No buckets visited within timeout of " +
- parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
- response.respond(Response.Status.GATEWAY_TIMEOUT);
- break;
- }
- case SUCCESS: // Intentional fallthrough.
- case ABORTED: // Intentional fallthrough.
- if (error.get() == null) {
- ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
- if (progress != null && ! progress.isFinished())
- response.writeContinuation(progress.serializeToString());
-
- if (getVisitorStatistics() != null)
- response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
-
- response.respond(Response.Status.OK);
- break;
- }
- default:
- response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
- if (getVisitorStatistics() != null)
- response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
+ try (response) {
+ callback.onEnd(response);
+
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
- response.respond(Response.Status.BAD_GATEWAY);
+ int status = Response.Status.BAD_GATEWAY;
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " +
+ parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
+ status = Response.Status.GATEWAY_TIMEOUT;
+ break;
+ }
+ // TODO jonmv: [test] limit pending,
+ // TODO jonmv: [test] abort on shutdown,
+ // TODO jonmv: always supply and document continuation?
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (error.get() == null) {
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+
+ status = Response.Status.OK;
+ break;
+ }
+ default:
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
+ }
+ if ( ! streaming)
+ response.commit(status);
}
});
visitDispatcher.execute(() -> {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 96700f08823..829a9bcab9f 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -4,6 +4,7 @@ package com.yahoo.document.restapi.resource;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.container.jdisc.RequestHandlerTestDriver;
import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.document.BucketId;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
@@ -37,6 +38,7 @@ import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
+import com.yahoo.documentapi.VisitorIterator;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
@@ -245,19 +247,83 @@ public class DocumentV1ApiTest {
"}", response.readAll());
assertEquals(200, response.getStatus());
+ // GET at root is a visit. Streaming mode can be specified with &streaming=true
+ access.expect(tokens);
+ access.expect(parameters -> {
+ assertEquals("content", parameters.getRoute().toString());
+ assertEquals("default", parameters.getBucketSpace());
+ assertEquals(1024, parameters.getMaxTotalHits());
+ assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
+ assertEquals("[id]", parameters.getFieldSet());
+ assertEquals("(all the things)", parameters.getDocumentSelection());
+ assertEquals(6000, parameters.getSessionTimeoutMs());
+ // Put some documents in the response
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1));
+ parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2));
+ VisitorStatistics statistics = new VisitorStatistics();
+ statistics.setBucketsVisited(1);
+ statistics.setDocumentsVisited(3);
+ parameters.getControlHandler().onVisitorStatistics(statistics);
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK");
+ });
+ response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
+ "&selection=all%20the%20things&fieldSet=[id]&timeout=6&streaming=true");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1\"," +
+ " \"documents\": [" +
+ " {" +
+ " \"id\": \"id:space:music::one\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Tom Waits\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Asa-Chan & Jun-Ray\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:g=a:three\"," +
+ " \"fields\": {}" +
+ " }" +
+ " ]," +
+ " \"documentCount\": 3" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
// GET with namespace and document type is a restricted visit.
+ ProgressToken progress = new ProgressToken();
+ VisitorIterator.createFromExplicitBucketSet(Set.of(new BucketId(1), new BucketId(2)), 8, progress)
+ .update(new BucketId(1), new BucketId(1));
access.expect(parameters -> {
assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
- assertEquals(new ProgressToken().serializeToString(), parameters.getResumeToken().serializeToString());
+ assertEquals(progress.serializeToString(), parameters.getResumeToken().serializeToString());
throw new IllegalArgumentException("parse failure");
});
- response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString());
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + progress.serializeToString());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid\"," +
" \"message\": \"parse failure\"" +
"}", response.readAll());
assertEquals(400, response.getStatus());
+ // GET when a streaming visit returns status code 200 also when errors occur.
+ access.expect(parameters -> {
+ assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ parameters.getControlHandler().onProgress(progress);
+ parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "failure?");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?streaming=true");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"documents\": []," +
+ //" \"continuation\": \"" + progress.serializeToString() + "\"," +
+ " \"message\": \"failure?\"" +
+ "}", response.readAll());
+ assertEquals(200, response.getStatus());
+
// POST with namespace and document type is a restricted visit with a required destination cluster ("destinationCluster")
access.expect(parameters -> {
fail("Not supposed to run");