diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-12-14 10:17:50 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-12-14 10:17:50 +0100 |
commit | ab9c36d5eba9fe469d7e1f6e0c075ace0cb3c1fe (patch) | |
tree | 9fb7569e8670278efdb0dc8bd3425b712dedd019 /vespa-feed-client | |
parent | b216e35423ec9784e493b89b9b85bab9f6468e23 (diff) |
Use user specified timeout for client timeout as well
Diffstat (limited to 'vespa-feed-client')
4 files changed, 20 insertions, 11 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 8233a1773f5..1874bd42e16 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -83,7 +83,8 @@ class ApacheCluster implements Cluster { @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } @Override public void cancelled() { vessel.cancel(false); } }); - Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, 200, TimeUnit.SECONDS); + long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000; + Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS); vessel.whenComplete((__, ___) -> cancellation.cancel(true)); } catch (Throwable thrown) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 3fd44596d63..c136d697a0b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -92,7 +92,8 @@ class HttpFeedClient implements FeedClient { HttpRequest request = new HttpRequest(method, getPath(documentId) + getQuery(params), requestHeaders, - operationJson == null ? null : operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? + operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? + params.timeout().orElse(null)); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 08b8ca08c61..0ad7b82347e 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client.impl; +import java.time.Duration; import java.util.Map; import java.util.function.Supplier; @@ -10,12 +11,14 @@ class HttpRequest { private final String path; private final Map<String, Supplier<String>> headers; private final byte[] body; + private final Duration timeout; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) { + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) { this.method = method; this.path = path; this.headers = headers; this.body = body; + this.timeout = timeout; } public String method() { @@ -34,6 +37,10 @@ class HttpRequest { return body; } + public Duration timeout() { + return timeout; + } + @Override public String toString() { return method + " " + path; diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index d293abf4f3e..d7be4ead078 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -41,7 +41,7 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null); + HttpRequest request = new HttpRequest("PUT", "/", null, null, null); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS)); @@ -99,7 +99,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -140,8 +140,8 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. now.set(4000); @@ -159,7 +159,7 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -205,9 +205,9 @@ class HttpRequestStrategyTest { DocumentId id2 = DocumentId.of("ns", "type", "2"); DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); - HttpRequest failing = new HttpRequest("POST", "/", null, null); - HttpRequest request = new HttpRequest("POST", "/", null, null); - HttpRequest blocking = new HttpRequest("POST", "/", null, null); + HttpRequest failing = new HttpRequest("POST", "/", null, null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null, null); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); |