diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-12-13 15:31:35 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-12-13 15:31:35 +0100 |
commit | bd6ddeff5c194f38818c5d0cf04489b0ec68c3c1 (patch) | |
tree | 7289eae459ef4347011265beea24f95116b323cb | |
parent | 2fe83706ed4065bef6f6b1b955b742eceefeee2a (diff) |
Time out requests after 200s
-rw-r--r-- | linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java | 1 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java | 46 |
2 files changed, 37 insertions, 10 deletions
diff --git a/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java b/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java index 3fc173dd82e..20b5de3b165 100644 --- a/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java +++ b/linguistics/src/test/java/com/yahoo/language/opennlp/OptimaizeDetectorTestCase.java @@ -3,7 +3,6 @@ package com.yahoo.language.opennlp; import com.yahoo.language.Language; import com.yahoo.language.detect.Detector; -import com.yahoo.language.simple.SimpleDetector; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 6dc9ec4efb1..3d91e26943f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -23,6 +23,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.nio.charset.StandardCharsets.UTF_8; @@ -40,9 +44,11 @@ class ApacheCluster implements Cluster { private final RequestConfig defaultConfig = RequestConfig.custom() .setConnectTimeout(Timeout.ofSeconds(10)) .setConnectionRequestTimeout(Timeout.DISABLED) - .setResponseTimeout(Timeout.ofMinutes(5)) + .setResponseTimeout(Timeout.ofSeconds(190)) .build(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread")); + ApacheCluster(FeedClientBuilderImpl builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) @@ -59,6 +65,7 @@ class ApacheCluster implements Cluster { min = endpoints.get(i).inflight.get(); } Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); try { SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); @@ -70,13 +77,14 @@ class ApacheCluster implements Cluster { if (wrapped.body() != null) request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); - endpoint.inflight.incrementAndGet(); - endpoint.client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); + Future<?> future = endpoint.client.execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, 200, TimeUnit.SECONDS); + vessel.whenComplete((__, ___) -> cancellation.cancel(true)); } catch (Throwable thrown) { vessel.completeExceptionally(thrown); @@ -87,7 +95,7 @@ class ApacheCluster implements Cluster { @Override public void close() { Throwable thrown = null; - for (Endpoint endpoint : endpoints) + for (Endpoint endpoint : endpoints) { try { endpoint.client.close(); } @@ -95,6 +103,8 @@ class ApacheCluster implements Cluster { if (thrown == null) thrown = t; else thrown.addSuppressed(t); } + } + executor.shutdownNow().forEach(Runnable::run); if (thrown != null) throw new RuntimeException(thrown); } @@ -171,4 +181,22 @@ class ApacheCluster implements Cluster { } + static class TimeoutTask implements Runnable { + + private final Future<?> request; + private final CompletableFuture<HttpResponse> vessel; + + TimeoutTask(Future<?> request, CompletableFuture<HttpResponse> vessel) { + this.request = request; + this.vessel = vessel; + } + + @Override + public void run() { + request.cancel(true); + vessel.cancel(true); + } + + } + } |