aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-11-07 09:15:20 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-11-07 09:15:20 +0100
commit02e9a93817048989152ce04ae1dde8a92d9a818c (patch)
tree918c37e001993a756d9359a5c7e594520c2c2447 /vespaclient-container-plugin
parent0dac2ad839529051b01e2ea4c4be53bb7a14fb16 (diff)
Replace session timeout with explicit shutdown
Session timeout causes message bus to reply with timeouts when timeout passes. This works poorly with visitors whose document put acks are delayed until the network layer consumes the documents, which may take longer than the remaining session timeout, which is used as message timeout. Keeping the message timeout fixed, and doing a manual abort of the session instead, when the specified timeout has occurred, almost eliminates the problem. Additionally, acking all outstanding documents upon abortion makes the visitors return in a timely manner, and should take care of the rest of the problem.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java31
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java2
2 files changed, 24 insertions, 9 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 29fd419daa2..8f2b346516f 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -92,7 +92,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -591,6 +593,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final AtomicLong documentsWritten = new AtomicLong();
private final AtomicLong documentsFlushed = new AtomicLong();
private final AtomicLong documentsAcked = new AtomicLong();
+ private final AtomicBoolean ackAll = new AtomicBoolean();
private boolean documentsDone = false;
private boolean first = true;
private ContentChannel channel;
@@ -727,8 +730,13 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
+ void ackAll() {
+ ackAll.set(true);
+ ackDocuments();
+ }
+
void ackDocuments() {
- while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE) {
+ while (documentsAcked.incrementAndGet() <= documentsFlushed.get() + FLUSH_SIZE || ackAll.get()) {
CompletionHandler ack = acks.poll();
if (ack != null)
ack.completed();
@@ -1055,18 +1063,18 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
VisitorParameters parameters = parseCommonParameters(request, path, cluster);
parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
parameters.setMaxTotalHits(wantedDocumentCount);
- StaticThrottlePolicy throttlePolicy;
+ parameters.visitInconsistentBuckets(true);
+ long timeoutMs = Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis());
if (streamed) {
- throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
+ StaticThrottlePolicy throttlePolicy = new DynamicThrottlePolicy().setMinWindowSize(1).setWindowSizeIncrement(1);
concurrency.ifPresent(throttlePolicy::setMaxPendingCount);
+ parameters.setThrottlePolicy(throttlePolicy);
+ parameters.setTimeoutMs(timeoutMs); // Ensure visitor eventually completes.
}
else {
- throttlePolicy = new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1)));
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(Math.min(100, concurrency.orElse(1))));
+ parameters.setSessionTimeoutMs(timeoutMs);
}
- parameters.setThrottlePolicy(throttlePolicy);
- parameters.visitInconsistentBuckets(true);
- parameters.setSessionTimeoutMs(Math.max(1, request.getTimeout(TimeUnit.MILLISECONDS) - handlerTimeout.toMillis()));
-
return parameters;
}
@@ -1223,6 +1231,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents.
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
+ final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, request.getTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) : null;
@Override public void onDone(CompletionCode code, String message) {
super.onDone(code, message);
loggingException(() -> {
@@ -1258,10 +1267,16 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.commit(status);
}
});
+ if (abort != null) abort.cancel(false); // Avoid keeping scheduled future alive if this completes in any other fashion.
visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
+
+ }
+ @Override public void abort() {
+ super.abort();
+ response.ackAll();
}
};
if (parameters.getRemoteDataHandler() == null) {
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 2452e19bfff..de907f70c19 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -257,7 +257,7 @@ public class DocumentV1ApiTest {
assertEquals(1, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
assertEquals("[id]", parameters.getFieldSet());
assertEquals("(all the things)", parameters.getDocumentSelection());
- assertEquals(6000, parameters.getSessionTimeoutMs());
+ assertEquals(6000, parameters.getTimeoutMs());
assertEquals(4, parameters.getSlices());
assertEquals(1, parameters.getSliceId());
// Put some documents in the response