diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-10 12:09:48 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-10 12:09:48 +0200 |
commit | 54da1662cb6891c88db864b02eb05dbfd7d94c14 (patch) | |
tree | 531b9957691c310c6c1aae6d5aa38ee44183ffc7 /vespa-feed-client | |
parent | becde905f311604fb3cefa53011618cd89fbad7e (diff) |
Hide client library behind HttpRequest and HttpResponse
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java) | 36 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java | 11 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java | 5 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java | 23 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java | 41 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 33 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java | 16 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java | 7 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java | 53 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java | 33 |
10 files changed, 158 insertions, 100 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index 5099458f3fe..672f5f080b5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -8,6 +8,7 @@ import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http2.config.H2Config; import org.apache.hc.core5.net.URIAuthority; @@ -29,18 +30,23 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; /** * @author jonmv */ -class HttpCluster implements Cluster { +class ApacheCluster implements Cluster { private final List<Endpoint> endpoints = new ArrayList<>(); - public HttpCluster(FeedClientBuilder builder) throws IOException { + public ApacheCluster(FeedClientBuilder builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) + request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); + int index = 0; int min = Integer.MAX_VALUE; for (int i = 0; i < endpoints.size(); i++) @@ -56,7 +62,7 @@ class HttpCluster implements Cluster { request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); endpoint.client.execute(request, new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } @Override public void cancelled() { vessel.cancel(false); } }); @@ -148,4 +154,26 @@ class HttpCluster implements Cluster { return sslContextBuilder.build(); } + + private static class ApacheHttpResponse implements HttpResponse { + + private final SimpleHttpResponse wrapped; + + private ApacheHttpResponse(SimpleHttpResponse wrapped) { + this.wrapped = wrapped; + } + + + @Override + public int code() { + return wrapped.getCode(); + } + + @Override + public byte[] body() { + return wrapped.getBodyBytes(); + } + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java index 1ae8ae1d490..0e9bfe0ef46 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -41,7 +38,7 @@ public class BenchmarkingCluster implements Cluster { } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { requests.incrementAndGet(); long startMillis = System.currentTimeMillis(); delegate.dispatch(request, vessel); @@ -49,13 +46,13 @@ public class BenchmarkingCluster implements Cluster { results++; if (thrown == null) { responses++; - responsesByCode[response.getCode()]++; + responsesByCode[response.code()]++; long latency = System.currentTimeMillis() - startMillis; totalLatencyMillis += latency; minLatencyMillis = Math.min(minLatencyMillis, latency); maxLatencyMillis = Math.max(maxLatencyMillis, latency); - bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length; - bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length; + bytesSent += request.body() == null ? 0 : request.body().length; + bytesReceived += response.body() == null ? 0 : response.body().length; } else exceptions++; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index bcf1c4ae107..f428fb567e6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.io.Closeable; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -14,7 +11,7 @@ import java.util.concurrent.CompletableFuture; interface Cluster extends Closeable { /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ - void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel); + void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel); @Override default void close() { } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 82f74170230..c572f84db54 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -4,8 +4,6 @@ package ai.vespa.feed.client; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.net.URIBuilder; @@ -13,6 +11,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -21,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; /** @@ -87,18 +87,15 @@ class HttpFeedClient implements FeedClient { ensureOpen(); String path = operationPath(documentId, params).toString(); - SimpleHttpRequest request = new SimpleHttpRequest(method, path); - requestHeaders.forEach((name, value) -> request.setHeader(name, value.get())); - if (operationJson != null) - request.setBody(operationJson, ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? return requestStrategy.enqueue(documentId, request) .thenApply(response -> toResult(request, response, documentId)); } - static Result toResult(SimpleHttpRequest request, SimpleHttpResponse response, DocumentId documentId) { + static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) { Result.Type type; - switch (response.getCode()) { + switch (response.code()) { case 200: type = Result.Type.success; break; case 412: type = Result.Type.conditionNotMet; break; case 502: @@ -110,9 +107,9 @@ class HttpFeedClient implements FeedClient { String message = null; String trace = null; try { - JsonParser parser = factory.createParser(response.getBodyText()); + JsonParser parser = factory.createParser(response.body()); if (parser.nextToken() != JsonToken.START_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); String name; while ((name = parser.nextFieldName()) != null) { @@ -124,15 +121,15 @@ class HttpFeedClient implements FeedClient { } if (parser.currentToken() != JsonToken.END_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } if (type == null) // Not a Vespa response, but a failure in the HTTP layer. - throw new FeedException("Status " + response.getCode() + " executing '" + request + - "': " + (message == null ? response.getBodyText() : message)); + throw new FeedException("Status " + response.code() + " executing '" + request + + "': " + (message == null ? new String(response.body(), UTF_8) : message)); return new Result(type, documentId, message, trace); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java index c3fb37fe4df..8da2f46def2 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java @@ -1,3 +1,42 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client;public class HttpRequest { +package ai.vespa.feed.client; + +import java.util.Map; +import java.util.function.Supplier; + +class HttpRequest { + + private final String method; + private final String path; + private final Map<String, Supplier<String>> headers; + private final byte[] body; + + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) { + this.method = method; + this.path = path; + this.headers = headers; + this.body = body; + } + + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map<String, Supplier<String>> headers() { + return headers; + } + + public byte[] body() { + return body; + } + + @Override + public String toString() { + return method + " " + path; + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 408488cbaec..5646d37cde3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -3,8 +3,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; import java.util.Map; @@ -23,6 +21,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.lang.Math.max; import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; @@ -60,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new HttpCluster(builder))); + this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { @@ -112,15 +111,15 @@ class HttpRequestStrategy implements RequestStrategy { return inflight.get() - delayedCount.get() > targetInflight(); } - private boolean retry(SimpleHttpRequest request, int attempt) { + private boolean retry(HttpRequest request, int attempt) { if (attempt > strategy.retries()) return false; - switch (request.getMethod().toUpperCase()) { + switch (request.method().toUpperCase()) { case "POST": return strategy.retry(FeedClient.OperationType.PUT); case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE); case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE); - default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); + default: throw new IllegalStateException("Unexpected HTTP method: " + request.method()); } } @@ -128,7 +127,7 @@ class HttpRequestStrategy implements RequestStrategy { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { + private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(); log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); @@ -151,23 +150,23 @@ class HttpRequestStrategy implements RequestStrategy { } /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ - private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { - if (response.getCode() / 100 == 2) { + private boolean retry(HttpRequest request, HttpResponse response, int attempt) { + if (response.code() / 100 == 2) { breaker.success(); incrementTargetInflight(); return false; } - log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + + log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) + ") on attempt " + attempt + " at " + request); - if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. + if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. decreaseTargetInflight(); return true; } breaker.failure(); - if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors. + if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors. return retry(request, attempt); return false; @@ -205,9 +204,9 @@ class HttpRequestStrategy implements RequestStrategy { } @Override - public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { - CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. - CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. + public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { + CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. CompletableFuture<?> previous = inflightById.put(documentId, result); if (destroyed.get()) { result.cancel(true); @@ -232,14 +231,14 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { retries.incrementAndGet(); CircuitBreaker.State state = breaker.state(); - CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); + CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(() -> cluster.dispatch(request, retry)); handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java new file mode 100644 index 00000000000..b1dd54240eb --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +interface HttpResponse { + + int code(); + byte[] body(); + + static HttpResponse of(int code, byte[] body) { + return new HttpResponse() { + @Override public int code() { return code; } + @Override public byte[] body() { return body; } + }; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index bc2707bc490..7764dce712b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,12 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - -import java.io.Closeable; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; /** * Controls execution of feed operations. @@ -28,6 +23,6 @@ interface RequestStrategy { void await(); /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ - CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request); + CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index 6155d8bf3b6..0605ed5bf65 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -1,21 +1,16 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.core5.http.ContentType; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.BiFunction; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -27,13 +22,13 @@ class HttpFeedClientTest { @Test void testFeeding() throws ExecutionException, InterruptedException { DocumentId id = DocumentId.of("ns", "type", "0"); - AtomicReference<BiFunction<DocumentId, SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>(); + AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } @Override public boolean hasFailed() { return false; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } - @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { return dispatch.get().apply(documentId, request); } + @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy()); @@ -42,21 +37,20 @@ class HttpFeedClientTest { try { assertEquals(id, documentId); assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", - request.getUri().toString()); - assertEquals("json", request.getBodyText()); + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); - SimpleHttpResponse response = new SimpleHttpResponse(502); - response.setBody("{\n" + - " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + - " \"id\": \"id:ns:type::0\",\n" + - " \"message\": \"Ooops! ... I did it again.\",\n" + - " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + - "}", - ContentType.APPLICATION_JSON); + HttpResponse response = HttpResponse.of(502, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Ooops! ... I did it again.\",\n" + + " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + + "}").getBytes(UTF_8)); return CompletableFuture.completedFuture(response); } catch (Throwable thrown) { - CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>(); + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); failed.completeExceptionally(thrown); return failed; } @@ -78,21 +72,20 @@ class HttpFeedClientTest { try { assertEquals(id, documentId); assertEquals("/document/v1/ns/type/docid/0", - request.getUri().toString()); - assertEquals("json", request.getBodyText()); + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); - SimpleHttpResponse response = new SimpleHttpResponse(500); - response.setBody("{\n" + - " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + - " \"id\": \"id:ns:type::0\",\n" + - " \"message\": \"Alla ska i jorden.\",\n" + - " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + - "}", - ContentType.APPLICATION_JSON); + HttpResponse response = HttpResponse.of(500, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Alla ska i jorden.\",\n" + + " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + + "}").getBytes(UTF_8)); return CompletableFuture.completedFuture(response); } catch (Throwable thrown) { - CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>(); + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); failed.completeExceptionally(thrown); return failed; } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 7411f4124e5..d3005227184 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -2,8 +2,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.junit.jupiter.api.Test; @@ -35,9 +33,8 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/"); - SimpleHttpResponse response = new SimpleHttpResponse(200); - response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest("PUT", "/", null, null); + HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS)); @@ -84,7 +81,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - SimpleHttpRequest request = new SimpleHttpRequest("POST", "/"); + HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -101,17 +98,17 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().requests()); // Successful response is returned - SimpleHttpResponse success = new SimpleHttpResponse(200); + HttpResponse success = HttpResponse.of(200, null); cluster.expect((__, vessel) -> vessel.complete(success)); assertEquals(success, strategy.enqueue(id1, request).get()); assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. now.set(2000); - SimpleHttpResponse throttled = new SimpleHttpResponse(429); + HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); - AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>(); + AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>(); cluster.expect((req, vessel) -> { if (req == request) { if (count.decrementAndGet() > 0) @@ -123,9 +120,9 @@ class HttpRequestStrategyTest { } else vessel.complete(success); }); - CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")); - assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get()); + CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. now.set(4000); @@ -135,7 +132,7 @@ class HttpRequestStrategyTest { assertEquals(success, serialised.get()); // Some error responses are retried. - SimpleHttpResponse serverError = new SimpleHttpResponse(500); + HttpResponse serverError = HttpResponse.of(500, null); cluster.expect((__, vessel) -> vessel.complete(serverError)); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); @@ -143,11 +140,11 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. - SimpleHttpResponse badRequest = new SimpleHttpResponse(400); + HttpResponse badRequest = HttpResponse.of(400, null); cluster.expect((__, vessel) -> vessel.complete(badRequest)); assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); @@ -169,14 +166,14 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { - final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>(); + final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); - void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) { + void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { dispatch.get().accept(request, vessel); } |