diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-11-08 10:58:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-08 10:58:46 +0100 |
commit | c258dd2c52d3488bc2dea6cb28c8797405985cf6 (patch) | |
tree | 80803a65d3d051c0cd48d7e1807fdcdf00a4e2c3 | |
parent | cf608891e14b3e4fa45900c12cde4c177c8d4e86 (diff) | |
parent | eb7e2ae25fd525d93d893e66364280920f75d458 (diff) |
Merge pull request #19896 from vespa-engine/jonmv/explicit-abort-instead-of-timeout-for-streamed-visits
Jonmv/explicit abort instead of timeout for streamed visits
3 files changed, 22 insertions, 15 deletions
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java index fd2de2e1c1d..f48658af7e3 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/deployment/ZipBuilderTest.java @@ -59,4 +59,5 @@ public class ZipBuilderTest { } return contents; } + }
\ No newline at end of file diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 29fd419daa2..a102e8fffd4 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -112,6 +114,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS; import static com.yahoo.jdisc.http.HttpRequest.Method.POST; import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.joining; @@ -211,11 +214,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -233,7 +236,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter. request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis()) + handlerTimeout.toMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); Path requestPath = new Path(request.getUri()); for (String path : handlers.keySet()) @@ -260,7 +263,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void handleTimeout(Request request, ResponseHandler responseHandler) { - timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler); + timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler); } @Override @@ -290,10 +293,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { while (outstanding.get() > 0 && clock.instant().isBefore(doom)) Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); - if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) dispatcher.shutdownNow(); - if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) visitDispatcher.shutdownNow(); } catch (InterruptedException e) { @@ -1055,18 +1058,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); - StaticThrottlePolicy throttlePolicy; + parameters.visitInconsistentBuckets(true); + long timeoutMs = Math.max(1, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()); if (streamed) { - throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); + StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); concurrency.ifPresent(throttlePolicy::setMaxPendingCount); + parameters.setThrottlePolicy(throttlePolicy); + parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes. } else { - throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)))); + parameters.setSessionTimeoutMs(timeoutMs); } - parameters.setThrottlePolicy(throttlePolicy); - parameters.visitInconsistentBuckets(true); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); - return parameters; } @@ -1076,7 +1079,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER))); parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1)); long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L); - parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()))); + parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()))); return parameters; } @@ -1223,6 +1226,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { + final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null; @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { @@ -1258,10 +1262,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.commit(status); } }); + if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion. visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); + } }; if (parameters.getRemoteDataHandler() == null) { diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 2452e19bfff..de907f70c19 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -257,7 +257,7 @@ public class DocumentV1ApiTest { assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); - assertEquals(6000, parameters.getSessionTimeoutMs()); + assertEquals(6000, parameters.getTimeoutMs()); assertEquals(4, parameters.getSlices()); assertEquals(1, parameters.getSliceId()); // Put some documents in the response |