aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java34
1 files changed, 20 insertions, 14 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 29fd419daa2..a102e8fffd4 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -112,6 +114,7 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.joining;
@@ -211,11 +214,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued,
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued,
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -233,7 +236,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// Set a higher HTTP layer timeout than the document API timeout, to prefer triggering the latter.
request.setTimeout( getProperty(request, TIMEOUT, timeoutMillisParser).orElse(defaultTimeout.toMillis())
+ handlerTimeout.toMillis(),
- TimeUnit.MILLISECONDS);
+ MILLISECONDS);
Path requestPath = new Path(request.getUri());
for (String path : handlers.keySet())
@@ -260,7 +263,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void handleTimeout(Request request, ResponseHandler responseHandler) {
- timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler);
+ timeout((HttpRequest) request, "Timeout after " + (request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis()) + "ms", responseHandler);
}
@Override
@@ -290,10 +293,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
while (outstanding.get() > 0 && clock.instant().isBefore(doom))
Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
- if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
dispatcher.shutdownNow();
- if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), MILLISECONDS))
visitDispatcher.shutdownNow();
}
catch (InterruptedException e) {
@@ -1055,18 +1058,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
- StaticThrottlePolicy throttlePolicy;
+ parameters.visitInconsistentBuckets(true);
+ long timeoutMs = Math.max(1, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis());
if (streamed) {
- throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
+ StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
+ parameters.setThrottlePolicy(throttlePolicy);
+ parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes.
}
else {
- throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)));
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))));
+ parameters.setSessionTimeoutMs(timeoutMs);
}
- parameters.setThrottlePolicy(throttlePolicy);
- parameters.visitInconsistentBuckets(true);
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
-
return parameters;
}
@@ -1076,7 +1079,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, Optional.of(requireProperty(request, CLUSTER)));
parameters.setThrottlePolicy(new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1));
long timeChunk = getProperty(request, TIME_CHUNK, timeoutMillisParser).orElse(60_000L);
- parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())));
+ parameters.setSessionTimeoutMs(Math.max(1, Math.min(timeChunk, request.getTimeout(MILLISECONDS) - handlerTimeout.toMillis())));
return parameters;
}
@@ -1223,6 +1226,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
+ final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(MILLISECONDS), MILLISECONDS) : null;
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
@@ -1258,10 +1262,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
+ if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion.
visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
+
}
};
if (parameters.getRemoteDataHandler() == null) {