diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 29b34ff4468..dd089340258 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -358,10 +358,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - boolean streaming = getProperty(request, STREAM, booleanParser).orElse(false); - VisitorParameters parameters = parseGetParameters(request, path, streaming); + boolean streamed = getProperty(request, STREAM, booleanParser).orElse(false); + VisitorParameters parameters = parseGetParameters(request, path, streamed); return () -> { - visitAndWrite(request, parameters, handler, streaming); + visitAndWrite(request, parameters, handler, streamed); return true; // VisitorSession has its own throttle handling. }; }); @@ -971,16 +971,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) { - int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10, + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streamed) { + int wantedDocumentCount = Math.min(streamed ? Integer.MAX_VALUE : 1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser) - .orElse(streaming ? Integer.MAX_VALUE : 1)); + .orElse(streamed ? Integer.MAX_VALUE : 1)); if (wantedDocumentCount <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); - int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1)); - if (concurrency <= 0) - throw new IllegalArgumentException("concurrency must be positive"); + Optional<Integer> concurrency = getProperty(request, CONCURRENCY, integerParser); + concurrency.ifPresent(value -> { + if (value <= 0) + throw new IllegalArgumentException("concurrency must be positive"); + }); Optional<String> cluster = getProperty(request, CLUSTER); if (cluster.isEmpty() && path.documentType().isEmpty()) @@ -992,7 +994,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); + StaticThrottlePolicy throttlePolicy; + if (streamed) { + throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); + concurrency.ifPresent(throttlePolicy::setMaxPendingCount); + } + else { + throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))); + } + parameters.setThrottlePolicy(throttlePolicy); parameters.visitInconsistentBuckets(true); parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); if (slices.isPresent() && sliceId.isPresent()) @@ -1104,16 +1114,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { - visit(request, parameters, streaming, handler, new VisitCallback() { + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) { + visit(request, parameters, streamed, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { - if (streaming) + if (streamed) response.commit(Response.Status.OK); response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { - if (streaming) + if (streamed) response.writeDocumentValue(document, new CompletionHandler() { @Override public void completed() { ack.run();} @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } |