From 54da1662cb6891c88db864b02eb05dbfd7d94c14 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 10 Jun 2021 12:09:48 +0200 Subject: Hide client library behind HttpRequest and HttpResponse --- .../java/ai/vespa/feed/client/ApacheCluster.java | 179 +++++++++++++++++++++ .../ai/vespa/feed/client/BenchmarkingCluster.java | 11 +- .../main/java/ai/vespa/feed/client/Cluster.java | 5 +- .../java/ai/vespa/feed/client/HttpCluster.java | 151 ----------------- .../java/ai/vespa/feed/client/HttpFeedClient.java | 23 ++- .../java/ai/vespa/feed/client/HttpRequest.java | 41 ++++- .../ai/vespa/feed/client/HttpRequestStrategy.java | 33 ++-- .../java/ai/vespa/feed/client/HttpResponse.java | 16 ++ .../java/ai/vespa/feed/client/RequestStrategy.java | 7 +- .../ai/vespa/feed/client/HttpFeedClientTest.java | 53 +++--- .../vespa/feed/client/HttpRequestStrategyTest.java | 33 ++-- 11 files changed, 305 insertions(+), 247 deletions(-) create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java delete mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java (limited to 'vespa-feed-client') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java new file mode 100644 index 00000000000..672f5f080b5 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -0,0 +1,179 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; + +/** + * @author jonmv + */ +class ApacheCluster implements Cluster { + + private final List endpoints = new ArrayList<>(); + + public ApacheCluster(FeedClientBuilder builder) throws IOException { + for (URI endpoint : builder.endpoints) + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); + } + + @Override + public void dispatch(HttpRequest wrapped, CompletableFuture vessel) { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) + request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); + + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + request.setScheme(endpoint.url.getScheme()); + request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); + endpoint.client.execute(request, + new FutureCallback() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.close(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final CloseableHttpAsyncClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI url; + + private Endpoint(CloseableHttpAsyncClient client, URI url) { + this.client = client; + this.url = url; + + this.client.start(); + } + + } + + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { + H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() + .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) + .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) + .disableCookieManagement() + .disableRedirectHandling() + .disableAutomaticRetries() + .setIOReactorConfig(IOReactorConfig.custom() + .setIoThreadCount(2) + .setTcpNoDelay(true) + .setSoTimeout(Timeout.ofSeconds(10)) + .build()) + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(10)) + .setConnectionRequestTimeout(Timeout.DISABLED) + .setResponseTimeout(Timeout.ofMinutes(5)) + .build()) + .setH2Config(H2Config.custom() + .setMaxConcurrentStreams(builder.maxStreamsPerConnection) + .setCompressionEnabled(true) + .setPushEnabled(false) + .setInitialWindowSize(Integer.MAX_VALUE) + .build()); + + SSLContext sslContext = constructSslContext(builder); + String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); + if (allowedCiphers.length == 0) + throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); + + ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() + .setCiphers(allowedCiphers) + .setSslContext(sslContext); + if (builder.hostnameVerifier != null) { + tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); + } + return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) + .build(); + } + + private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { + if (builder.sslContext != null) return builder.sslContext; + SslContextBuilder sslContextBuilder = new SslContextBuilder(); + if (builder.certificate != null && builder.privateKey != null) { + sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); + } + if (builder.caCertificates != null) { + sslContextBuilder.withCaCertificates(builder.caCertificates); + } + return sslContextBuilder.build(); + } + + + private static class ApacheHttpResponse implements HttpResponse { + + private final SimpleHttpResponse wrapped; + + private ApacheHttpResponse(SimpleHttpResponse wrapped) { + this.wrapped = wrapped; + } + + + @Override + public int code() { + return wrapped.getCode(); + } + + @Override + public byte[] body() { + return wrapped.getBodyBytes(); + } + + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java index 1ae8ae1d490..0e9bfe0ef46 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -41,7 +38,7 @@ public class BenchmarkingCluster implements Cluster { } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture vessel) { + public void dispatch(HttpRequest request, CompletableFuture vessel) { requests.incrementAndGet(); long startMillis = System.currentTimeMillis(); delegate.dispatch(request, vessel); @@ -49,13 +46,13 @@ public class BenchmarkingCluster implements Cluster { results++; if (thrown == null) { responses++; - responsesByCode[response.getCode()]++; + responsesByCode[response.code()]++; long latency = System.currentTimeMillis() - startMillis; totalLatencyMillis += latency; minLatencyMillis = Math.min(minLatencyMillis, latency); maxLatencyMillis = Math.max(maxLatencyMillis, latency); - bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length; - bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length; + bytesSent += request.body() == null ? 0 : request.body().length; + bytesReceived += response.body() == null ? 0 : response.body().length; } else exceptions++; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index bcf1c4ae107..f428fb567e6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -1,9 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - import java.io.Closeable; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -14,7 +11,7 @@ import java.util.concurrent.CompletableFuture; interface Cluster extends Closeable { /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ - void dispatch(SimpleHttpRequest request, CompletableFuture vessel); + void dispatch(HttpRequest request, CompletableFuture vessel); @Override default void close() { } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java deleted file mode 100644 index 5099458f3fe..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; -import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; -import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.http.message.BasicHeader; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.net.URIAuthority; -import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.util.Timeout; - -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; - -/** - * @author jonmv - */ -class HttpCluster implements Cluster { - - private final List endpoints = new ArrayList<>(); - - public HttpCluster(FeedClientBuilder builder) throws IOException { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); - } - - @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - request.setScheme(endpoint.url.getScheme()); - request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); - endpoint.client.execute(request, - new FutureCallback() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - endpoint.client.close(); - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final CloseableHttpAsyncClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI url; - - private Endpoint(CloseableHttpAsyncClient client, URI url) { - this.client = client; - this.url = url; - - this.client.start(); - } - - } - - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { - H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() - .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) - .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) - .disableCookieManagement() - .disableRedirectHandling() - .disableAutomaticRetries() - .setIOReactorConfig(IOReactorConfig.custom() - .setIoThreadCount(2) - .setTcpNoDelay(true) - .setSoTimeout(Timeout.ofSeconds(10)) - .build()) - .setDefaultRequestConfig(RequestConfig.custom() - .setConnectTimeout(Timeout.ofSeconds(10)) - .setConnectionRequestTimeout(Timeout.DISABLED) - .setResponseTimeout(Timeout.ofMinutes(5)) - .build()) - .setH2Config(H2Config.custom() - .setMaxConcurrentStreams(builder.maxStreamsPerConnection) - .setCompressionEnabled(true) - .setPushEnabled(false) - .setInitialWindowSize(Integer.MAX_VALUE) - .build()); - - SSLContext sslContext = constructSslContext(builder); - String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); - if (allowedCiphers.length == 0) - throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); - - ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() - .setCiphers(allowedCiphers) - .setSslContext(sslContext); - if (builder.hostnameVerifier != null) { - tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); - } - return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) - .build(); - } - - private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { - if (builder.sslContext != null) return builder.sslContext; - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (builder.certificate != null && builder.privateKey != null) { - sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); - } - if (builder.caCertificates != null) { - sslContextBuilder.withCaCertificates(builder.caCertificates); - } - return sslContextBuilder.build(); - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 82f74170230..c572f84db54 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -4,8 +4,6 @@ package ai.vespa.feed.client; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.net.URIBuilder; @@ -13,6 +11,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -21,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; /** @@ -87,18 +87,15 @@ class HttpFeedClient implements FeedClient { ensureOpen(); String path = operationPath(documentId, params).toString(); - SimpleHttpRequest request = new SimpleHttpRequest(method, path); - requestHeaders.forEach((name, value) -> request.setHeader(name, value.get())); - if (operationJson != null) - request.setBody(operationJson, ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest(method, path, requestHeaders, operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? return requestStrategy.enqueue(documentId, request) .thenApply(response -> toResult(request, response, documentId)); } - static Result toResult(SimpleHttpRequest request, SimpleHttpResponse response, DocumentId documentId) { + static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) { Result.Type type; - switch (response.getCode()) { + switch (response.code()) { case 200: type = Result.Type.success; break; case 412: type = Result.Type.conditionNotMet; break; case 502: @@ -110,9 +107,9 @@ class HttpFeedClient implements FeedClient { String message = null; String trace = null; try { - JsonParser parser = factory.createParser(response.getBodyText()); + JsonParser parser = factory.createParser(response.body()); if (parser.nextToken() != JsonToken.START_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); String name; while ((name = parser.nextFieldName()) != null) { @@ -124,15 +121,15 @@ class HttpFeedClient implements FeedClient { } if (parser.currentToken() != JsonToken.END_OBJECT) - throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + response.getBodyText()); + throw new IllegalArgumentException("Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + new String(response.body(), UTF_8)); } catch (IOException e) { throw new UncheckedIOException(e); } if (type == null) // Not a Vespa response, but a failure in the HTTP layer. - throw new FeedException("Status " + response.getCode() + " executing '" + request + - "': " + (message == null ? response.getBodyText() : message)); + throw new FeedException("Status " + response.code() + " executing '" + request + + "': " + (message == null ? new String(response.body(), UTF_8) : message)); return new Result(type, documentId, message, trace); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java index c3fb37fe4df..8da2f46def2 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java @@ -1,3 +1,42 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client;public class HttpRequest { +package ai.vespa.feed.client; + +import java.util.Map; +import java.util.function.Supplier; + +class HttpRequest { + + private final String method; + private final String path; + private final Map> headers; + private final byte[] body; + + public HttpRequest(String method, String path, Map> headers, byte[] body) { + this.method = method; + this.path = path; + this.headers = headers; + this.body = body; + } + + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map> headers() { + return headers; + } + + public byte[] body() { + return body; + } + + @Override + public String toString() { + return method + " " + path; + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 408488cbaec..5646d37cde3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -3,8 +3,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; import java.util.Map; @@ -23,6 +21,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.lang.Math.max; import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; @@ -60,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new HttpCluster(builder))); + this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { @@ -112,15 +111,15 @@ class HttpRequestStrategy implements RequestStrategy { return inflight.get() - delayedCount.get() > targetInflight(); } - private boolean retry(SimpleHttpRequest request, int attempt) { + private boolean retry(HttpRequest request, int attempt) { if (attempt > strategy.retries()) return false; - switch (request.getMethod().toUpperCase()) { + switch (request.method().toUpperCase()) { case "POST": return strategy.retry(FeedClient.OperationType.PUT); case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE); case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE); - default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); + default: throw new IllegalStateException("Unexpected HTTP method: " + request.method()); } } @@ -128,7 +127,7 @@ class HttpRequestStrategy implements RequestStrategy { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { + private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(); log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); @@ -151,23 +150,23 @@ class HttpRequestStrategy implements RequestStrategy { } /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ - private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { - if (response.getCode() / 100 == 2) { + private boolean retry(HttpRequest request, HttpResponse response, int attempt) { + if (response.code() / 100 == 2) { breaker.success(); incrementTargetInflight(); return false; } - log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + + log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) + ") on attempt " + attempt + " at " + request); - if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. + if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. decreaseTargetInflight(); return true; } breaker.failure(); - if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors. + if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors. return retry(request, attempt); return false; @@ -205,9 +204,9 @@ class HttpRequestStrategy implements RequestStrategy { } @Override - public CompletableFuture enqueue(DocumentId documentId, SimpleHttpRequest request) { - CompletableFuture result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. - CompletableFuture vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. + public CompletableFuture enqueue(DocumentId documentId, HttpRequest request) { + CompletableFuture result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + CompletableFuture vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. CompletableFuture previous = inflightById.put(documentId, result); if (destroyed.get()) { result.cancel(true); @@ -232,14 +231,14 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture vessel, SimpleHttpRequest request, CompletableFuture result, int attempt) { + private void handleAttempt(CompletableFuture vessel, HttpRequest request, CompletableFuture result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { retries.incrementAndGet(); CircuitBreaker.State state = breaker.state(); - CompletableFuture retry = new CompletableFuture<>(); + CompletableFuture retry = new CompletableFuture<>(); offer(() -> cluster.dispatch(request, retry)); handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java new file mode 100644 index 00000000000..b1dd54240eb --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +interface HttpResponse { + + int code(); + byte[] body(); + + static HttpResponse of(int code, byte[] body) { + return new HttpResponse() { + @Override public int code() { return code; } + @Override public byte[] body() { return body; } + }; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index bc2707bc490..7764dce712b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,12 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; - -import java.io.Closeable; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; /** * Controls execution of feed operations. @@ -28,6 +23,6 @@ interface RequestStrategy { void await(); /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ - CompletableFuture enqueue(DocumentId documentId, SimpleHttpRequest request); + CompletableFuture enqueue(DocumentId documentId, HttpRequest request); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index 6155d8bf3b6..0605ed5bf65 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -1,21 +1,16 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.core5.http.ContentType; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.BiFunction; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -27,13 +22,13 @@ class HttpFeedClientTest { @Test void testFeeding() throws ExecutionException, InterruptedException { DocumentId id = DocumentId.of("ns", "type", "0"); - AtomicReference>> dispatch = new AtomicReference<>(); + AtomicReference>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } @Override public boolean hasFailed() { return false; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } - @Override public CompletableFuture enqueue(DocumentId documentId, SimpleHttpRequest request) { return dispatch.get().apply(documentId, request); } + @Override public CompletableFuture enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy()); @@ -42,21 +37,20 @@ class HttpFeedClientTest { try { assertEquals(id, documentId); assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", - request.getUri().toString()); - assertEquals("json", request.getBodyText()); + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); - SimpleHttpResponse response = new SimpleHttpResponse(502); - response.setBody("{\n" + - " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + - " \"id\": \"id:ns:type::0\",\n" + - " \"message\": \"Ooops! ... I did it again.\",\n" + - " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + - "}", - ContentType.APPLICATION_JSON); + HttpResponse response = HttpResponse.of(502, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Ooops! ... I did it again.\",\n" + + " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + + "}").getBytes(UTF_8)); return CompletableFuture.completedFuture(response); } catch (Throwable thrown) { - CompletableFuture failed = new CompletableFuture<>(); + CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(thrown); return failed; } @@ -78,21 +72,20 @@ class HttpFeedClientTest { try { assertEquals(id, documentId); assertEquals("/document/v1/ns/type/docid/0", - request.getUri().toString()); - assertEquals("json", request.getBodyText()); + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); - SimpleHttpResponse response = new SimpleHttpResponse(500); - response.setBody("{\n" + - " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + - " \"id\": \"id:ns:type::0\",\n" + - " \"message\": \"Alla ska i jorden.\",\n" + - " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + - "}", - ContentType.APPLICATION_JSON); + HttpResponse response = HttpResponse.of(500, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Alla ska i jorden.\",\n" + + " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + + "}").getBytes(UTF_8)); return CompletableFuture.completedFuture(response); } catch (Throwable thrown) { - CompletableFuture failed = new CompletableFuture<>(); + CompletableFuture failed = new CompletableFuture<>(); failed.completeExceptionally(thrown); return failed; } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 7411f4124e5..d3005227184 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -2,8 +2,6 @@ package ai.vespa.feed.client; import ai.vespa.feed.client.FeedClient.CircuitBreaker; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.junit.jupiter.api.Test; @@ -35,9 +33,8 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/"); - SimpleHttpResponse response = new SimpleHttpResponse(200); - response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON); + HttpRequest request = new HttpRequest("PUT", "/", null, null); + HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS)); @@ -84,7 +81,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - SimpleHttpRequest request = new SimpleHttpRequest("POST", "/"); + HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -101,17 +98,17 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().requests()); // Successful response is returned - SimpleHttpResponse success = new SimpleHttpResponse(200); + HttpResponse success = HttpResponse.of(200, null); cluster.expect((__, vessel) -> vessel.complete(success)); assertEquals(success, strategy.enqueue(id1, request).get()); assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. now.set(2000); - SimpleHttpResponse throttled = new SimpleHttpResponse(429); + HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); - AtomicReference> completion = new AtomicReference<>(); + AtomicReference> completion = new AtomicReference<>(); cluster.expect((req, vessel) -> { if (req == request) { if (count.decrementAndGet() > 0) @@ -123,9 +120,9 @@ class HttpRequestStrategyTest { } else vessel.complete(success); }); - CompletableFuture delayed = strategy.enqueue(id1, request); - CompletableFuture serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")); - assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get()); + CompletableFuture delayed = strategy.enqueue(id1, request); + CompletableFuture serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. now.set(4000); @@ -135,7 +132,7 @@ class HttpRequestStrategyTest { assertEquals(success, serialised.get()); // Some error responses are retried. - SimpleHttpResponse serverError = new SimpleHttpResponse(500); + HttpResponse serverError = HttpResponse.of(500, null); cluster.expect((__, vessel) -> vessel.complete(serverError)); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); @@ -143,11 +140,11 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. - SimpleHttpResponse badRequest = new SimpleHttpResponse(400); + HttpResponse badRequest = HttpResponse.of(400, null); cluster.expect((__, vessel) -> vessel.complete(badRequest)); assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); @@ -169,14 +166,14 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { - final AtomicReference>> dispatch = new AtomicReference<>(); + final AtomicReference>> dispatch = new AtomicReference<>(); - void expect(BiConsumer> expected) { + void expect(BiConsumer> expected) { dispatch.set(expected); } @Override - public void dispatch(SimpleHttpRequest request, CompletableFuture vessel) { + public void dispatch(HttpRequest request, CompletableFuture vessel) { dispatch.get().accept(request, vessel); } -- cgit v1.2.3