diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 29fd419daa2..a102e8fffd4 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -112,6 +114,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS; import static com.yahoo.jdisc.http.HttpRequest.Method.POST; import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.joining; @@ -211,11 +214,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -233,7 +236,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter. request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis()) + handlerTimeout.toMillis(), - TimeUnit.MILLISECONDS); + MILLISECONDS); Path requestPath = new Path(request.getUri()); for (String path : handlers.keySet()) @@ -260,7 +263,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void handleTimeout(Request request, ResponseHandler responseHandler) { - timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler); + timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler); } @Override @@ -290,10 +293,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { while (outstanding.get() > 0 && clock.instant().isBefore(doom)) Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); - if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) dispatcher.shutdownNow(); - if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS)) visitDispatcher.shutdownNow(); } catch (InterruptedException e) { @@ -1055,18 +1058,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); - StaticThrottlePolicy throttlePolicy; + parameters.visitInconsistentBuckets(true); + long timeoutMs = Math.max(1, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()); if (streamed) { - throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); + StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); concurrency.ifPresent(throttlePolicy::setMaxPendingCount); + parameters.setThrottlePolicy(throttlePolicy); + parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes. } else { - throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)))); + parameters.setSessionTimeoutMs(timeoutMs); } - parameters.setThrottlePolicy(throttlePolicy); - parameters.visitInconsistentBuckets(true); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); - return parameters; } @@ -1076,7 +1079,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER))); parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1)); long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L); - parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()))); + parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()))); return parameters; } @@ -1223,6 +1226,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { + final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null; @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { @@ -1258,10 +1262,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.commit(status); } }); + if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion. visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); + } }; if (parameters.getRemoteDataHandler() == null) { |