aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-28 12:08:06 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-28 12:08:06 +0200
commitf84f6a487f404fdddff2a7f74348ea8b9510ca33 (patch)
tree9bccca1b040aa6103a26416d3496825768df04e8 /vespaclient-container-plugin
parent181309354f4550dd3decf1918ba9367b47eee256 (diff)
Use an optionally bounded, dynamic throttling policy for streamed vists
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java38
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java2
2 files changed, 25 insertions, 15 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 29b34ff4468..dd089340258 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -358,10 +358,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
- boolean streaming = getProperty(request, STREAM, booleanParser).orElse(false);
- VisitorParameters parameters = parseGetParameters(request, path, streaming);
+ boolean streamed = getProperty(request, STREAM, booleanParser).orElse(false);
+ VisitorParameters parameters = parseGetParameters(request, path, streamed);
return () -> {
- visitAndWrite(request, parameters, handler, streaming);
+ visitAndWrite(request, parameters, handler, streamed);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -971,16 +971,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------- Visits ------------------------------------------------
- private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streaming) {
- int wantedDocumentCount = Math.min(streaming ? Integer.MAX_VALUE : 1 << 10,
+ private VisitorParameters parseGetParameters(HttpRequest request, DocumentPath path, boolean streamed) {
+ int wantedDocumentCount = Math.min(streamed ? Integer.MAX_VALUE : 1 << 10,
getProperty(request, WANTED_DOCUMENT_COUNT, integerParser)
- .orElse(streaming ? Integer.MAX_VALUE : 1));
+ .orElse(streamed ? Integer.MAX_VALUE : 1));
if (wantedDocumentCount <= 0)
throw new IllegalArgumentException("wantedDocumentCount must be positive");
- int concurrency = Math.min(100, getProperty(request, CONCURRENCY, integerParser).orElse(1));
- if (concurrency <= 0)
- throw new IllegalArgumentException("concurrency must be positive");
+ Optional<Integer> concurrency = getProperty(request, CONCURRENCY, integerParser);
+ concurrency.ifPresent(value -> {
+ if (value <= 0)
+ throw new IllegalArgumentException("concurrency must be positive");
+ });
Optional<String> cluster = getProperty(request, CLUSTER);
if (cluster.isEmpty() && path.documentType().isEmpty())
@@ -992,7 +994,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
+ StaticThrottlePolicy throttlePolicy;
+ if (streamed) {
+ throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
+ concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
+ }
+ else {
+ throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)));
+ }
+ parameters.setThrottlePolicy(throttlePolicy);
parameters.visitInconsistentBuckets(true);
parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
if (slices.isPresent() && sliceId.isPresent())
@@ -1104,16 +1114,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streaming) {
- visit(request, parameters, streaming, handler, new VisitCallback() {
+ private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, boolean streamed) {
+ visit(request, parameters, streamed, handler, new VisitCallback() {
@Override public void onStart(JsonResponse response) throws IOException {
- if (streaming)
+ if (streamed)
response.commit(Response.Status.OK);
response.writeDocumentsArrayStart();
}
@Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
- if (streaming)
+ if (streamed)
response.writeDocumentValue(document, new CompletionHandler() {
@Override public void completed() { ack.run();}
@Override public void failed(Throwable t) { ack.run(); onError.accept(t.getMessage()); }
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 412558f9a87..2452e19bfff 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -254,7 +254,7 @@ public class DocumentV1ApiTest {
assertEquals("content", parameters.getRoute().toString());
assertEquals("default", parameters.getBucketSpace());
assertEquals(1025, parameters.getMaxTotalHits()); // Not bounded likewise for streamed responses.
- assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
+ assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
assertEquals(6000, parameters.getSessionTimeoutMs());