diff options
author | jonmv <venstad@gmail.com> | 2024-05-28 16:07:57 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2024-05-28 16:07:57 +0200 |
commit | 19ea3d77e8cb11a2a94718eda0e3d3890ced2eff (patch) | |
tree | d40f36f834c01221d145505007e9ed0325dd6635 /vespa-feed-client | |
parent | 8276854275b3daaf79d6774062bce6279981c4ba (diff) |
Update HTTP request timeout parameter as time passes
Diffstat (limited to 'vespa-feed-client')
5 files changed, 48 insertions, 31 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 424481b6ef2..d5eab8e17af 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -58,6 +58,7 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; + LongSupplier nanoClock = System::nanoTime; public FeedClientBuilderImpl() { } @@ -251,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } + FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) { + this.nanoClock = requireNonNull(nanoClock); + return this; + } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index d12d72f7a70..8ee281fb38d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -55,6 +55,7 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); private final boolean speedTest; + private final LongSupplier nanoClock; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { this(builder, @@ -69,6 +70,7 @@ class HttpFeedClient implements FeedClient { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; + this.nanoClock = builder.nanoClock; verifyConnection(builder, clusterFactory); } @@ -111,11 +113,12 @@ class HttpFeedClient implements FeedClient { throw new IllegalStateException("Client is closed"); HttpRequest request = new HttpRequest(method, - getPath(documentId) + getQuery(params, speedTest), + getPath(documentId), + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? params.timeout().orElse(maxTimeout), - System::nanoTime); + nanoClock); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -137,11 +140,12 @@ class HttpFeedClient implements FeedClient { Instant start = Instant.now(); try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", - getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), + getPath(DocumentId.of("feeder", "handshake", "dummy")), + getQuery(empty(), true), requestHeaders, null, Duration.ofSeconds(15), - System::nanoTime); + nanoClock); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); @@ -312,7 +316,6 @@ class HttpFeedClient implements FeedClient { StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); if (params.createIfNonExistent()) query.add("create=true"); params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); - params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); params.route().ifPresent(route -> query.add("route=" + encode(route))); params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); if (speedTest) query.add("dryRun=true"); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index fdd35b74c35..a9c59708031 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -10,15 +10,17 @@ class HttpRequest { private final String method; private final String path; + private final String query; private final Map<String, Supplier<String>> headers; private final byte[] body; private final Duration timeout; private final long deadlineNanos; private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { + public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { this.method = method; this.path = path; + this.query = query; this.headers = headers; this.body = body; this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); @@ -31,7 +33,7 @@ class HttpRequest { } public String path() { - return path; + return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms"; } public Map<String, Supplier<String>> headers() { diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index cec070c06a6..e2e96593390 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -43,7 +43,9 @@ class HttpFeedClientTest { @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } - FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), + FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setDryrun(true) + .setNanoClock(() -> 0), () -> new DryrunCluster(), new MockRequestStrategy()); @@ -51,7 +53,7 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", request.path()); assertEquals("PUT", request.method()); assertEquals("json", new String(request.body(), UTF_8)); @@ -83,7 +85,7 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?tracelevel=1", + assertEquals("/document/v1/ns/type/docid/0?tracelevel=1&timeout=900000ms", request.path()); assertEquals("DELETE", request.method()); assertNull(request.body()); @@ -145,7 +147,7 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&route=route&timeout=5000ms", request.path()); assertEquals("json", new String(request.body(), UTF_8)); @@ -183,7 +185,7 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", request.path()); assertEquals("json", new String(request.body(), UTF_8)); @@ -227,7 +229,7 @@ class HttpFeedClientTest { try { assertNull(request.body()); assertEquals("POST", request.method()); - assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true", request.path()); + assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true&timeout=15000ms", request.path()); vessel.complete(response.get()); } catch (Throwable t) { @@ -238,19 +240,23 @@ class HttpFeedClientTest { // Old server, and speed-test. assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0) + .setSpeedTest(true), () -> cluster, null)) .getMessage()); // Old server. - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), () -> cluster, null); // New server. response.set(okResponse); - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), () -> cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index e067a103d59..a95f859df4d 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -44,7 +44,7 @@ class HttpRequestStrategyTest { @Test void testConcurrency() throws IOException { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); + HttpRequest request = new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); @@ -102,7 +102,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(180), nowNanos::get); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -146,8 +146,8 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. nowNanos.set(4_000_000_000L); @@ -168,7 +168,7 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -184,7 +184,7 @@ class HttpRequestStrategyTest { vessel.completeExceptionally(new IOException("retry me")); }); expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get()); + () -> strategy.enqueue(id1, new HttpRequest("POST", "/", "", null, null, Duration.ofMillis(100), nowNanos::get)).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); assertEquals(15, strategy.stats().requests()); @@ -223,15 +223,15 @@ class HttpRequestStrategyTest { // First operation fails, second remains in flight, and third fails. clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); clusters.get(0).expect((__, vessel) -> { try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } }); - CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); + CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)); CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); // Time advances, and the circuit breaker half-opens. assertEquals(CLOSED, breaker.state()); @@ -241,11 +241,11 @@ class HttpRequestStrategyTest { // It's indeterminate which cluster gets the next request, but the second should get the next one after that. clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); assertFalse(clusters.get(0).closed.get()); assertFalse(clusters.get(1).closed.get()); @@ -276,10 +276,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest failing = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest partial = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest blocking = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); |