summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-12-13 17:32:32 +0100
committerGitHub <noreply@github.com>2021-12-13 17:32:32 +0100
commit572acd43fd03c9eaaf1f99cdd18effd5be9ff426 (patch)
treee9f4c6560fbdc4808ea337653659efded63ad586
parent60a7801c166df08909c354e058b5fcdab3947f02 (diff)
parentbd6ddeff5c194f38818c5d0cf04489b0ec68c3c1 (diff)
Merge pull request #20494 from vespa-engine/jonmv/time-out-feed-operations-in-client
Time out requests after 200s
-rw-r--r--linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java1
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java46
2 files changed, 37 insertions, 10 deletions
diff --git a/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java b/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java
index 3fc173dd82e..20b5de3b165 100644
--- a/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java
+++ b/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java
@@ -3,7 +3,6 @@ package com.yahoo.language.opennlp;
import com.yahoo.language.Language;
import com.yahoo.language.detect.Detector;
-import com.yahoo.language.simple.SimpleDetector;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
index 6dc9ec4efb1..3d91e26943f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
@@ -23,6 +23,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -40,9 +44,11 @@ class ApacheCluster implements Cluster {
private final RequestConfig defaultConfig = RequestConfig.custom()
.setConnectTimeout(Timeout.ofSeconds(10))
.setConnectionRequestTimeout(Timeout.DISABLED)
- .setResponseTimeout(Timeout.ofMinutes(5))
+ .setResponseTimeout(Timeout.ofSeconds(190))
.build();
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread"));
+
ApacheCluster(FeedClientBuilderImpl builder) throws IOException {
for (URI endpoint : builder.endpoints)
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
@@ -59,6 +65,7 @@ class ApacheCluster implements Cluster {
min = endpoints.get(i).inflight.get();
}
Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
try {
SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
@@ -70,13 +77,14 @@ class ApacheCluster implements Cluster {
if (wrapped.body() != null)
request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
- endpoint.inflight.incrementAndGet();
- endpoint.client.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
+ Future<?> future = endpoint.client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
+ Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, 200, TimeUnit.SECONDS);
+ vessel.whenComplete((__, ___) -> cancellation.cancel(true));
}
catch (Throwable thrown) {
vessel.completeExceptionally(thrown);
@@ -87,7 +95,7 @@ class ApacheCluster implements Cluster {
@Override
public void close() {
Throwable thrown = null;
- for (Endpoint endpoint : endpoints)
+ for (Endpoint endpoint : endpoints) {
try {
endpoint.client.close();
}
@@ -95,6 +103,8 @@ class ApacheCluster implements Cluster {
if (thrown == null) thrown = t;
else thrown.addSuppressed(t);
}
+ }
+ executor.shutdownNow().forEach(Runnable::run);
if (thrown != null) throw new RuntimeException(thrown);
}
@@ -171,4 +181,22 @@ class ApacheCluster implements Cluster {
}
+ static class TimeoutTask implements Runnable {
+
+ private final Future<?> request;
+ private final CompletableFuture<HttpResponse> vessel;
+
+ TimeoutTask(Future<?> request, CompletableFuture<HttpResponse> vessel) {
+ this.request = request;
+ this.vessel = vessel;
+ }
+
+ @Override
+ public void run() {
+ request.cancel(true);
+ vessel.cancel(true);
+ }
+
+ }
+
}