aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-28 16:07:57 +0200
committerjonmv <venstad@gmail.com>2024-05-28 16:07:57 +0200
commit19ea3d77e8cb11a2a94718eda0e3d3890ced2eff (patch)
treed40f36f834c01221d145505007e9ed0325dd6635 /vespa-feed-client
parent8276854275b3daaf79d6774062bce6279981c4ba (diff)
Update HTTP request timeout parameter as time passes
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java13
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java6
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java24
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java30
5 files changed, 48 insertions, 31 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index 424481b6ef2..d5eab8e17af 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -58,6 +58,7 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
Compression compression = auto;
URI proxy;
Duration connectionTtl = Duration.ZERO;
+ LongSupplier nanoClock = System::nanoTime;
public FeedClientBuilderImpl() { }
@@ -251,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
return this;
}
+ FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) {
+ this.nanoClock = requireNonNull(nanoClock);
+ return this;
+ }
+
/** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
@Override
public FeedClient build() {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index d12d72f7a70..8ee281fb38d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -55,6 +55,7 @@ class HttpFeedClient implements FeedClient {
private final RequestStrategy requestStrategy;
private final AtomicBoolean closed = new AtomicBoolean();
private final boolean speedTest;
+ private final LongSupplier nanoClock;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
this(builder,
@@ -69,6 +70,7 @@ class HttpFeedClient implements FeedClient {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
+ this.nanoClock = builder.nanoClock;
verifyConnection(builder, clusterFactory);
}
@@ -111,11 +113,12 @@ class HttpFeedClient implements FeedClient {
throw new IllegalStateException("Client is closed");
HttpRequest request = new HttpRequest(method,
- getPath(documentId) + getQuery(params, speedTest),
+ getPath(documentId),
+ getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
params.timeout().orElse(maxTimeout),
- System::nanoTime);
+ nanoClock);
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
@@ -137,11 +140,12 @@ class HttpFeedClient implements FeedClient {
Instant start = Instant.now();
try (Cluster cluster = clusterFactory.create()) {
HttpRequest request = new HttpRequest("POST",
- getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
+ getPath(DocumentId.of("feeder", "handshake", "dummy")),
+ getQuery(empty(), true),
requestHeaders,
null,
Duration.ofSeconds(15),
- System::nanoTime);
+ nanoClock);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
@@ -312,7 +316,6 @@ class HttpFeedClient implements FeedClient {
StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
if (params.createIfNonExistent()) query.add("create=true");
params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
- params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms"));
params.route().ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
if (speedTest) query.add("dryRun=true");
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index fdd35b74c35..a9c59708031 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -10,15 +10,17 @@ class HttpRequest {
private final String method;
private final String path;
+ private final String query;
private final Map<String, Supplier<String>> headers;
private final byte[] body;
private final Duration timeout;
private final long deadlineNanos;
private final LongSupplier nanoClock;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) {
+ public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) {
this.method = method;
this.path = path;
+ this.query = query;
this.headers = headers;
this.body = body;
this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos();
@@ -31,7 +33,7 @@ class HttpRequest {
}
public String path() {
- return path;
+ return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms";
}
public Map<String, Supplier<String>> headers() {
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index cec070c06a6..e2e96593390 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -43,7 +43,9 @@ class HttpFeedClientTest {
@Override public void await() { throw new UnsupportedOperationException(); }
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
- FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
+ FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setDryrun(true)
+ .setNanoClock(() -> 0),
() -> new DryrunCluster(),
new MockRequestStrategy());
@@ -51,7 +53,7 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0",
+ assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms",
request.path());
assertEquals("PUT", request.method());
assertEquals("json", new String(request.body(), UTF_8));
@@ -83,7 +85,7 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0?tracelevel=1",
+ assertEquals("/document/v1/ns/type/docid/0?tracelevel=1&timeout=900000ms",
request.path());
assertEquals("DELETE", request.method());
assertNull(request.body());
@@ -145,7 +147,7 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
+ assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&route=route&timeout=5000ms",
request.path());
assertEquals("json", new String(request.body(), UTF_8));
@@ -183,7 +185,7 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0",
+ assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms",
request.path());
assertEquals("json", new String(request.body(), UTF_8));
@@ -227,7 +229,7 @@ class HttpFeedClientTest {
try {
assertNull(request.body());
assertEquals("POST", request.method());
- assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true", request.path());
+ assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true&timeout=15000ms", request.path());
vessel.complete(response.get());
}
catch (Throwable t) {
@@ -238,19 +240,23 @@ class HttpFeedClientTest {
// Old server, and speed-test.
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
- () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
+ () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0)
+ .setSpeedTest(true),
() -> cluster,
null))
.getMessage());
// Old server.
- new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
+ new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0),
() -> cluster,
null);
// New server.
response.set(okResponse);
- new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
+ new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0),
() -> cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index e067a103d59..a95f859df4d 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -44,7 +44,7 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() throws IOException {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
+ HttpRequest request = new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
@@ -102,7 +102,7 @@ class HttpRequestStrategyTest {
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(180), nowNanos::get);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -146,8 +146,8 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
nowNanos.set(4_000_000_000L);
@@ -168,7 +168,7 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -184,7 +184,7 @@ class HttpRequestStrategyTest {
vessel.completeExceptionally(new IOException("retry me"));
});
expected = assertThrows(ExecutionException.class,
- () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get());
+ () -> strategy.enqueue(id1, new HttpRequest("POST", "/", "", null, null, Duration.ofMillis(100), nowNanos::get)).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
assertEquals(15, strategy.stats().requests());
@@ -223,15 +223,15 @@ class HttpRequestStrategyTest {
// First operation fails, second remains in flight, and third fails.
clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+ strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get();
Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
clusters.get(0).expect((__, vessel) -> {
try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
});
- CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
+ CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get));
CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+ strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get();
// Time advances, and the circuit breaker half-opens.
assertEquals(CLOSED, breaker.state());
@@ -241,11 +241,11 @@ class HttpRequestStrategyTest {
// It's indeterminate which cluster gets the next request, but the second should get the next one after that.
clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+ assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code());
clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+ assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code());
assertFalse(clusters.get(0).closed.get());
assertFalse(clusters.get(1).closed.get());
@@ -276,10 +276,10 @@ class HttpRequestStrategyTest {
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest failing = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest partial = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest blocking = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);