summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-11-08 10:58:46 +0100
committerGitHub <noreply@github.com>2021-11-08 10:58:46 +0100
commitc258dd2c52d3488bc2dea6cb28c8797405985cf6 (patch)
tree80803a65d3d051c0cd48d7e1807fdcdf00a4e2c3
parentcf608891e14b3e4fa45900c12cde4c177c8d4e86 (diff)
parenteb7e2ae25fd525d93d893e66364280920f75d458 (diff)
Merge pull request #19896 from vespa-engine/jonmv/explicit-abort-instead-of-timeout-for-streamed-visits
Jonmv/explicit abort instead of timeout for streamed visits
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java1
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java34
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java2
3 files changed, 22 insertions, 15 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java
index fd2de2e1c1d..f48658af7e3 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java
@@ -59,4 +59,5 @@ public class ZipBuilderTest {
}
return contents;
}
+
} \ No newline at end of file
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 29fd419daa2..a102e8fffd4 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -112,6 +114,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.joining;
@@ -211,11 +214,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued,
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued,
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -233,7 +236,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter.
request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis())
+ handlerTimeout.toMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
Path requestPath = new Path(request.getUri());
for (String path : handlers.keySet())
@@ -260,7 +263,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void handleTimeout(Request request, ResponseHandler responseHandler) {
- timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler);
+ timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler);
}
@Override
@@ -290,10 +293,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
- if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
dispatcher.shutdownNow();
- if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
catch (InterruptedException e) {
@@ -1055,18 +1058,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
- StaticThrottlePolicy throttlePolicy;
+ parameters.visitInconsistentBuckets(true);
+ long timeoutMs = Math.max(1, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis());
if (streamed) {
- throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
+ StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
+ parameters.setThrottlePolicy(throttlePolicy);
+ parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes.
}
else {
- throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)));
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))));
+ parameters.setSessionTimeoutMs(timeoutMs);
}
- parameters.setThrottlePolicy(throttlePolicy);
- parameters.visitInconsistentBuckets(true);
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
-
return parameters;
}
@@ -1076,7 +1079,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER)));
parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1));
long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L);
- parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())));
+ parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis())));
return parameters;
}
@@ -1223,6 +1226,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
+ final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null;
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
@@ -1258,10 +1262,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
+ if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion.
visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
+
}
};
if (parameters.getRemoteDataHandler() == null) {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 2452e19bfff..de907f70c19 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -257,7 +257,7 @@ public class DocumentV1ApiTest {
assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
- assertEquals(6000, parameters.getSessionTimeoutMs());
+ assertEquals(6000, parameters.getTimeoutMs());
assertEquals(4, parameters.getSlices());
assertEquals(1, parameters.getSliceId());
// Put some documents in the response