diff options
Diffstat (limited to 'vespaclient-container-plugin/src')
2 files changed, 24 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 29fd419daa2..8f2b346516f 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -591,6 +593,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final AtomicLong documentsWritten = new AtomicLong(); private final AtomicLong documentsFlushed = new AtomicLong(); private final AtomicLong documentsAcked = new AtomicLong(); + private final AtomicBoolean ackAll = new AtomicBoolean(); private boolean documentsDone = false; private boolean first = true; private ContentChannel channel; @@ -727,8 +730,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } + void ackAll() { + ackAll.set(true); + ackDocuments(); + } + void ackDocuments() { - while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) { + while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE || ackAll.get()) { CompletionHandler ack = acks.poll(); if (ack != null) ack.completed(); @@ -1055,18 +1063,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { VisitorParameters parameters = parseCommonParameters(request, path, cluster); parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); parameters.setMaxTotalHits(wantedDocumentCount); - StaticThrottlePolicy throttlePolicy; + parameters.visitInconsistentBuckets(true); + long timeoutMs = Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()); if (streamed) { - throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); + StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1); concurrency.ifPresent(throttlePolicy::setMaxPendingCount); + parameters.setThrottlePolicy(throttlePolicy); + parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes. } else { - throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)))); + parameters.setSessionTimeoutMs(timeoutMs); } - parameters.setThrottlePolicy(throttlePolicy); - parameters.visitInconsistentBuckets(true); - parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis())); - return parameters; } @@ -1223,6 +1231,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { + final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) : null; @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { @@ -1258,10 +1267,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.commit(status); } }); + if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion. visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); + + } + @Override public void abort() { + super.abort(); + response.ackAll(); } }; if (parameters.getRemoteDataHandler() == null) { diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 2452e19bfff..de907f70c19 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -257,7 +257,7 @@ public class DocumentV1ApiTest { assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); assertEquals("[id]", parameters.getFieldSet()); assertEquals("(all the things)", parameters.getDocumentSelection()); - assertEquals(6000, parameters.getSessionTimeoutMs()); + assertEquals(6000, parameters.getTimeoutMs()); assertEquals(4, parameters.getSlices()); assertEquals(1, parameters.getSliceId()); // Put some documents in the response |