From f84f6a487f404fdddff2a7f74348ea8b9510ca33 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 28 Oct 2021 12:08:06 +0200 Subject: Use an optionally bounded, dynamic throttling policy for streamed vists --- .../restapi/resource/DocumentV1ApiHandler.java | 38 ++++++++++++++-------- .../restapi/resource/DocumentV1ApiTest.java | 2 +- 2 files changed, 25 insertions(+), 15 deletions(-) (limited to 'vespaclient-container-plugin') diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 29b34ff4468..dd089340258 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -358,10 +358,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - boolean streaming = getProperty(request, STREAM, booleanParser).orElse(false); - VisitorParameters parameters = parseGetParameters(request, path, streaming); + boolean streamed = getProperty(request, STREAM, booleanParser).orElse(false); + VisitorParameters parameters = parseGetParameters(request, path, streamed); return () -> { - visitAndWrite(request, parameters, handler, streaming); + visitAndWrite(request, parameters, handler, streamed); return true; // VisitorSession has its own throttle handling. }; }); @@ -971,16 +971,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) { - int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10, + private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streamed) { + int wantedDocumentCount = Math.min(streamed ? Integer.MAX_VALUE : 1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, integerParser) - .orElse(streaming ? Integer.MAX_VALUE : 1)); + .orElse(streamed ? Integer.MAX_VALUE : 1)); if (wantedDocumentCount <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); - int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1)); - if (concurrency <= 0) - throw new IllegalArgumentException("concurrency must be positive"); + Optional concurrency = getProperty(request, CONCURRENCY, integerParser); + concurrency.ifPresent(value -> { + if (value <= 0) + throw new IllegalArgumentException("concurrency must be positive"); + }); Optional cluster = getProperty(request, CLUSTER); if (cluster.isEmpty() && path.documentType().isEmpty()) @@ -992,7 +994,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); + StaticThrottlePolicy throttlePolicy; + if (streamed) { + throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); + concurrency.ifPresent(throttlePolicy::setMaxPendingCount); + } + else { + throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))); + } + parameters.setThrottlePolicy(throttlePolicy); parameters.visitInconsistentBuckets(true); parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); if (slices.isPresent() && sliceId.isPresent()) @@ -1104,16 +1114,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) { - visit(request, parameters, streaming, handler, new VisitCallback() { + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) { + visit(request, parameters, streamed, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { - if (streaming) + if (streamed) response.commit(Response.Status.OK); response.writeDocumentsArrayStart(); } @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer onError) { - if (streaming) + if (streamed) response.writeDocumentValue(document, new CompletionHandler() { @Override public void completed() { ack.run();} @Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 412558f9a87..2452e19bfff 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -254,7 +254,7 @@ public class DocumentV1ApiTest { assertEquals("content", parameters.getRoute().toString()); assertEquals("default", parameters.getBucketSpace()); assertEquals(1025, parameters.getMaxTotalHits()); // Not bounded likewise for streamed responses. - assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); + assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); assertEquals(6000, parameters.getSessionTimeoutMs()); -- cgit v1.2.3