aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-12-14 10:17:50 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-12-14 10:17:50 +0100
commitab9c36d5eba9fe469d7e1f6e0c075ace0cb3c1fe (patch)
tree9fb7569e8670278efdb0dc8bd3425b712dedd019 /vespa-feed-client
parentb216e35423ec9784e493b89b9b85bab9f6468e23 (diff)
Use user specified timeout for client timeout as well
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java9
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java16
4 files changed, 20 insertions, 11 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
index 8233a1773f5..1874bd42e16 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
@@ -83,7 +83,8 @@ class ApacheCluster implements Cluster {
@Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
@Override public void cancelled() { vessel.cancel(false); }
});
- Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, 200, TimeUnit.SECONDS);
+ long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000;
+ Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS);
vessel.whenComplete((__, ___) -> cancellation.cancel(true));
}
catch (Throwable thrown) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 3fd44596d63..c136d697a0b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -92,7 +92,8 @@ class HttpFeedClient implements FeedClient {
HttpRequest request = new HttpRequest(method,
getPath(documentId) + getQuery(params),
requestHeaders,
- operationJson == null ? null : operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
+ operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
+ params.timeout().orElse(null));
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index 08b8ca08c61..0ad7b82347e 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client.impl;
+import java.time.Duration;
import java.util.Map;
import java.util.function.Supplier;
@@ -10,12 +11,14 @@ class HttpRequest {
private final String path;
private final Map<String, Supplier<String>> headers;
private final byte[] body;
+ private final Duration timeout;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) {
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) {
this.method = method;
this.path = path;
this.headers = headers;
this.body = body;
+ this.timeout = timeout;
}
public String method() {
@@ -34,6 +37,10 @@ class HttpRequest {
return body;
}
+ public Duration timeout() {
+ return timeout;
+ }
+
@Override
public String toString() {
return method + " " + path;
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index d293abf4f3e..d7be4ead078 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -41,7 +41,7 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS));
@@ -99,7 +99,7 @@ class HttpRequestStrategyTest {
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -140,8 +140,8 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
now.set(4000);
@@ -159,7 +159,7 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -205,9 +205,9 @@ class HttpRequestStrategyTest {
DocumentId id2 = DocumentId.of("ns", "type", "2");
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
- HttpRequest failing = new HttpRequest("POST", "/", null, null);
- HttpRequest request = new HttpRequest("POST", "/", null, null);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null);
+ HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);