diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-12-13 15:31:35 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-12-13 15:31:35 +0100 |
commit | bd6ddeff5c194f38818c5d0cf04489b0ec68c3c1 (patch) | |
tree | 7289eae459ef4347011265beea24f95116b323cb /vespa-feed-client | |
parent | 2fe83706ed4065bef6f6b1b955b742eceefeee2a (diff) |
Time out requests after 200s
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java | 46 |
1 files changed, 37 insertions, 9 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 6dc9ec4efb1..3d91e26943f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -23,6 +23,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static java.nio.charset.StandardCharsets.UTF_8; @@ -40,9 +44,11 @@ class ApacheCluster implements Cluster { private final RequestConfig defaultConfig = RequestConfig.custom() .setConnectTimeout(Timeout.ofSeconds(10)) .setConnectionRequestTimeout(Timeout.DISABLED) - .setResponseTimeout(Timeout.ofMinutes(5)) + .setResponseTimeout(Timeout.ofSeconds(190)) .build(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread")); + ApacheCluster(FeedClientBuilderImpl builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) @@ -59,6 +65,7 @@ class ApacheCluster implements Cluster { min = endpoints.get(i).inflight.get(); } Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); try { SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); @@ -70,13 +77,14 @@ class ApacheCluster implements Cluster { if (wrapped.body() != null) request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); - endpoint.inflight.incrementAndGet(); - endpoint.client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); + Future<?> future = endpoint.client.execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, 200, TimeUnit.SECONDS); + vessel.whenComplete((__, ___) -> cancellation.cancel(true)); } catch (Throwable thrown) { vessel.completeExceptionally(thrown); @@ -87,7 +95,7 @@ class ApacheCluster implements Cluster { @Override public void close() { Throwable thrown = null; - for (Endpoint endpoint : endpoints) + for (Endpoint endpoint : endpoints) { try { endpoint.client.close(); } @@ -95,6 +103,8 @@ class ApacheCluster implements Cluster { if (thrown == null) thrown = t; else thrown.addSuppressed(t); } + } + executor.shutdownNow().forEach(Runnable::run); if (thrown != null) throw new RuntimeException(thrown); } @@ -171,4 +181,22 @@ class ApacheCluster implements Cluster { } + static class TimeoutTask implements Runnable { + + private final Future<?> request; + private final CompletableFuture<HttpResponse> vessel; + + TimeoutTask(Future<?> request, CompletableFuture<HttpResponse> vessel) { + this.request = request; + this.vessel = vessel; + } + + @Override + public void run() { + request.cancel(true); + vessel.cancel(true); + } + + } + } |