aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java31
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java2
2 files changed, 24 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 29fd419daa2..8f2b346516f 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -591,6 +593,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final AtomicLong documentsWritten = new AtomicLong();
private final AtomicLong documentsFlushed = new AtomicLong();
private final AtomicLong documentsAcked = new AtomicLong();
+ private final AtomicBoolean ackAll = new AtomicBoolean();
private boolean documentsDone = false;
private boolean first = true;
private ContentChannel channel;
@@ -727,8 +730,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
+ void ackAll() {
+ ackAll.set(true);
+ ackDocuments();
+ }
+
void ackDocuments() {
- while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) {
+ while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE || ackAll.get()) {
CompletionHandler ack = acks.poll();
if (ack != null)
ack.completed();
@@ -1055,18 +1063,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
- StaticThrottlePolicy throttlePolicy;
+ parameters.visitInconsistentBuckets(true);
+ long timeoutMs = Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis());
if (streamed) {
- throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
+ StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
+ parameters.setThrottlePolicy(throttlePolicy);
+ parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes.
}
else {
- throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)));
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))));
+ parameters.setSessionTimeoutMs(timeoutMs);
}
- parameters.setThrottlePolicy(throttlePolicy);
- parameters.visitInconsistentBuckets(true);
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
-
return parameters;
}
@@ -1223,6 +1231,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
+ final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) : null;
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
@@ -1258,10 +1267,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
+ if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion.
visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
+
+ }
+ @Override public void abort() {
+ super.abort();
+ response.ackAll();
}
};
if (parameters.getRemoteDataHandler() == null) {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 2452e19bfff..de907f70c19 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -257,7 +257,7 @@ public class DocumentV1ApiTest {
assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
- assertEquals(6000, parameters.getSessionTimeoutMs());
+ assertEquals(6000, parameters.getTimeoutMs());
assertEquals(4, parameters.getSlices());
assertEquals(1, parameters.getSliceId());
// Put some documents in the response