diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-06-15 15:08:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-15 15:08:20 +0200 |
commit | f986f75c5bb3f587bb39b290b78f335e24641e03 (patch) | |
tree | 182692ee69e59febffd11d22e1b144f8e88ce04f | |
parent | 79ef7e0298e10f007646f19f6bcabe6cd03932aa (diff) | |
parent | e0fabae923b0343f2a30767e60b9a26458fe0c84 (diff) |
Merge pull request #18265 from vespa-engine/jonmv/vespa-feed-client
Jonmv/vespa feed client
8 files changed, 12 insertions, 283 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java index aa1bc1ce624..52b92737bb9 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java @@ -7,7 +7,7 @@ package com.yahoo.messagebus; * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link * #setMaxPendingSize(long)}), or some combination thereof. * - * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet. + * <b>NOTE:</b> By context, "pending" refers to the number of sent messages that have not been replied to yet. * * @author Simon Thoresen Hult */ diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json index 06da776b6c6..db9c1ff1a02 100644 --- a/vespa-feed-client/abi-spec.json +++ b/vespa-feed-client/abi-spec.json @@ -136,6 +136,7 @@ "public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)", "public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)", "public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)", + "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)", "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)", "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)", "public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)", @@ -236,20 +237,6 @@ ], "fields": [] }, - "ai.vespa.feed.client.OkCluster": { - "superClass": "java.lang.Object", - "interfaces": [ - "ai.vespa.feed.client.Cluster" - ], - "attributes": [ - "public" - ], - "methods": [ - "public void dispatch(ai.vespa.feed.client.HttpRequest, java.util.concurrent.CompletableFuture)", - "public void close()" - ], - "fields": [] - }, "ai.vespa.feed.client.OperationParameters": { "superClass": "java.lang.Object", "interfaces": [], diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index b0d12431c05..7d4938c6fb0 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -30,28 +30,11 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.eclipse.jetty.http2</groupId> - <artifactId>http2-http-client-transport</artifactId> - <version>${jetty.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-client</artifactId> - <version>${jetty.version}</version> - <scope>compile</scope> - </dependency> - <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> <scope>compile</scope> </dependency> <dependency> - <groupId>com.squareup.okhttp3</groupId> - <artifactId>okhttp</artifactId> - <version>4.9.1</version> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index e0418836c80..8b5eb9efea7 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -44,6 +44,7 @@ public class FeedClientBuilder { Collection<X509Certificate> certificate; PrivateKey privateKey; Collection<X509Certificate> caCertificates; + boolean benchmark; /** Creates a builder for a single container endpoint **/ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } @@ -100,6 +101,12 @@ public class FeedClientBuilder { return this; } + /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */ + public FeedClientBuilder setBenchmarkOn(boolean on) { + this.benchmark = on; + return this; + } + /** Adds HTTP request header to all client requests. */ public FeedClientBuilder addRequestHeader(String name, String value) { return addRequestHeader(name, () -> requireNonNull(value)); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 3cce423735f..6b2aec5d8b3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,11 +59,11 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); + this(builder, new ApacheCluster(builder)); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { - this.cluster = cluster; + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java deleted file mode 100644 index 56be53798b1..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.http.HttpField; -import org.eclipse.jetty.http2.client.HTTP2Client; -import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; -import org.eclipse.jetty.util.ssl.SslContextFactory; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jonmv - */ -class JettyCluster implements Cluster { - - private final List<Endpoint> endpoints = new ArrayList<>(); - - JettyCluster(FeedClientBuilder builder) { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); - } - - private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { - try { - SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); - clientSslCtxFactory.setSslContext(builder.constructSslContext()); - clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); - - HTTP2Client wrapped = new HTTP2Client(); - wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); - HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); - HttpClient client = new HttpClient(transport, clientSslCtxFactory); - client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); - client.setFollowRedirects(false); - client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection); - client.setMaxConnectionsPerDestination(1); - - client.start(); - return client; - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - Request jettyRequest = endpoint.client.newRequest(endpoint.uri.resolve(request.path())) - .method(request.method()) - .timeout(5, TimeUnit.MINUTES) - .content(request.body() == null ? null : new BytesContentProvider("application/json", request.body())); - request.headers().forEach((name, value) -> jettyRequest.header(name, value.get())); - jettyRequest.send(new BufferingResponseListener() { - @Override public void onComplete(Result result) { - if (result.isSucceeded()) - vessel.complete(HttpResponse.of(result.getResponse().getStatus(), - getContent())); - else - vessel.completeExceptionally(result.getFailure()); - } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - endpoint.client.stop(); - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final HttpClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI uri; - - private Endpoint(HttpClient client, URI uri) { - this.client = client; - this.uri = uri; - } - - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java deleted file mode 100644 index 62161bb9a33..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import org.jetbrains.annotations.NotNull; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.X509ExtendedTrustManager; -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jonmv - */ -public class OkCluster implements Cluster { - - private final List<Endpoint> endpoints = new ArrayList<>(); - - OkCluster(FeedClientBuilder builder) { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint)); - } - - private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) { - try { - return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS) - .callTimeout(5, TimeUnit.MINUTES) - .readTimeout(30, TimeUnit.SECONDS) - .writeTimeout(30, TimeUnit.SECONDS) - .followRedirects(false) - //.hostnameVerifier(builder.hostnameVerifier) - .retryOnConnectionFailure(false) - .sslSocketFactory(builder.constructSslContext().getSocketFactory(), - new X509ExtendedTrustManager() { - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }) - .build(); - - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - Request.Builder okRequest = new Request.Builder().method(request.method(), - RequestBody.create(request.body(), - MediaType.parse("application/json"))) - .url(endpoint.uri.resolve(request.path()).toString()); - request.headers().forEach((name, value) -> okRequest.header(name, value.get())); - endpoint.client.newCall(okRequest.build()).enqueue(new Callback() { - @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { - vessel.completeExceptionally(e); - } - @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { - vessel.complete(HttpResponse.of(response.code(), response.body().bytes())); - } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - //endpoint.client. - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final OkHttpClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI uri; - - private Endpoint(OkHttpClient client, URI uri) { - this.client = client; - this.uri = uri; - } - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java index 6b39d9053b4..9b30ebfd0aa 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java @@ -48,7 +48,7 @@ class GracePeriodCircuitBreakerTest { breaker.failure(); now.addAndGet(60 * SECOND); - assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passedd"); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); now.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); |