summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-06-15 15:08:20 +0200
committerGitHub <noreply@github.com>2021-06-15 15:08:20 +0200
commitf986f75c5bb3f587bb39b290b78f335e24641e03 (patch)
tree182692ee69e59febffd11d22e1b144f8e88ce04f
parent79ef7e0298e10f007646f19f6bcabe6cd03932aa (diff)
parente0fabae923b0343f2a30767e60b9a26458fe0c84 (diff)
Merge pull request #18265 from vespa-engine/jonmv/vespa-feed-client
Jonmv/vespa feed client
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java2
-rw-r--r--vespa-feed-client/abi-spec.json15
-rw-r--r--vespa-feed-client/pom.xml17
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java7
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java121
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java127
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java2
8 files changed, 12 insertions, 283 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
index aa1bc1ce624..52b92737bb9 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java
@@ -7,7 +7,7 @@ package com.yahoo.messagebus;
* way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link
* #setMaxPendingSize(long)}), or some combination thereof.
*
- * <b>NOTE:</b> By context, "pending" is refering to the number of sent messages that have not been replied to yet.
+ * <b>NOTE:</b> By context, "pending" refers to the number of sent messages that have not been replied to yet.
*
* @author Simon Thoresen Hult
*/
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json
index 06da776b6c6..db9c1ff1a02 100644
--- a/vespa-feed-client/abi-spec.json
+++ b/vespa-feed-client/abi-spec.json
@@ -136,6 +136,7 @@
"public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)",
"public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)",
"public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)",
+ "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)",
"public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)",
"public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)",
"public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)",
@@ -236,20 +237,6 @@
],
"fields": []
},
- "ai.vespa.feed.client.OkCluster": {
- "superClass": "java.lang.Object",
- "interfaces": [
- "ai.vespa.feed.client.Cluster"
- ],
- "attributes": [
- "public"
- ],
- "methods": [
- "public void dispatch(ai.vespa.feed.client.HttpRequest, java.util.concurrent.CompletableFuture)",
- "public void close()"
- ],
- "fields": []
- },
"ai.vespa.feed.client.OperationParameters": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index b0d12431c05..7d4938c6fb0 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -30,28 +30,11 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.eclipse.jetty.http2</groupId>
- <artifactId>http2-http-client-transport</artifactId>
- <version>${jetty.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-client</artifactId>
- <version>${jetty.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>okhttp</artifactId>
- <version>4.9.1</version>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index e0418836c80..8b5eb9efea7 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -44,6 +44,7 @@ public class FeedClientBuilder {
Collection<X509Certificate> certificate;
PrivateKey privateKey;
Collection<X509Certificate> caCertificates;
+ boolean benchmark;
/** Creates a builder for a single container endpoint **/
public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
@@ -100,6 +101,12 @@ public class FeedClientBuilder {
return this;
}
+ /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */
+ public FeedClientBuilder setBenchmarkOn(boolean on) {
+ this.benchmark = on;
+ return this;
+ }
+
/** Adds HTTP request header to all client requests. */
public FeedClientBuilder addRequestHeader(String name, String value) {
return addRequestHeader(name, () -> requireNonNull(value));
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 3cce423735f..6b2aec5d8b3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -59,11 +59,11 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
+ this(builder, new ApacheCluster(builder));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
- this.cluster = cluster;
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
deleted file mode 100644
index 56be53798b1..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
+++ /dev/null
@@ -1,121 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
-import org.eclipse.jetty.client.api.Request;
-import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.client.util.BufferingResponseListener;
-import org.eclipse.jetty.client.util.BytesContentProvider;
-import org.eclipse.jetty.http.HttpField;
-import org.eclipse.jetty.http2.client.HTTP2Client;
-import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author jonmv
- */
-class JettyCluster implements Cluster {
-
- private final List<Endpoint> endpoints = new ArrayList<>();
-
- JettyCluster(FeedClientBuilder builder) {
- for (URI endpoint : builder.endpoints)
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
- }
-
- private static HttpClient createJettyHttpClient(FeedClientBuilder builder) {
- try {
- SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client();
- clientSslCtxFactory.setSslContext(builder.constructSslContext());
- clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier);
-
- HTTP2Client wrapped = new HTTP2Client();
- wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection);
- HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped);
- HttpClient client = new HttpClient(transport, clientSslCtxFactory);
- client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION)));
- client.setFollowRedirects(false);
- client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection);
- client.setMaxConnectionsPerDestination(1);
-
- client.start();
- return client;
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-
- @Override
- public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
- int index = 0;
- int min = Integer.MAX_VALUE;
- for (int i = 0; i < endpoints.size(); i++)
- if (endpoints.get(i).inflight.get() < min) {
- index = i;
- min = endpoints.get(i).inflight.get();
- }
-
- Endpoint endpoint = endpoints.get(index);
- endpoint.inflight.incrementAndGet();
- try {
- Request jettyRequest = endpoint.client.newRequest(endpoint.uri.resolve(request.path()))
- .method(request.method())
- .timeout(5, TimeUnit.MINUTES)
- .content(request.body() == null ? null : new BytesContentProvider("application/json", request.body()));
- request.headers().forEach((name, value) -> jettyRequest.header(name, value.get()));
- jettyRequest.send(new BufferingResponseListener() {
- @Override public void onComplete(Result result) {
- if (result.isSucceeded())
- vessel.complete(HttpResponse.of(result.getResponse().getStatus(),
- getContent()));
- else
- vessel.completeExceptionally(result.getFailure());
- }
- });
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
- }
-
- @Override
- public void close() {
- Throwable thrown = null;
- for (Endpoint endpoint : endpoints)
- try {
- endpoint.client.stop();
- }
- catch (Throwable t) {
- if (thrown == null) thrown = t;
- else thrown.addSuppressed(t);
- }
- if (thrown != null) throw new RuntimeException(thrown);
- }
-
-
- private static class Endpoint {
-
- private final HttpClient client;
- private final AtomicInteger inflight = new AtomicInteger(0);
- private final URI uri;
-
- private Endpoint(HttpClient client, URI uri) {
- this.client = client;
- this.uri = uri;
- }
-
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
deleted file mode 100644
index 62161bb9a33..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
+++ /dev/null
@@ -1,127 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import okhttp3.Call;
-import okhttp3.Callback;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.jetbrains.annotations.NotNull;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.X509ExtendedTrustManager;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author jonmv
- */
-public class OkCluster implements Cluster {
-
- private final List<Endpoint> endpoints = new ArrayList<>();
-
- OkCluster(FeedClientBuilder builder) {
- for (URI endpoint : builder.endpoints)
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint));
- }
-
- private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) {
- try {
- return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS)
- .callTimeout(5, TimeUnit.MINUTES)
- .readTimeout(30, TimeUnit.SECONDS)
- .writeTimeout(30, TimeUnit.SECONDS)
- .followRedirects(false)
- //.hostnameVerifier(builder.hostnameVerifier)
- .retryOnConnectionFailure(false)
- .sslSocketFactory(builder.constructSslContext().getSocketFactory(),
- new X509ExtendedTrustManager() {
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
- @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } })
- .build();
-
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-
- @Override
- public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
- int index = 0;
- int min = Integer.MAX_VALUE;
- for (int i = 0; i < endpoints.size(); i++)
- if (endpoints.get(i).inflight.get() < min) {
- index = i;
- min = endpoints.get(i).inflight.get();
- }
-
- Endpoint endpoint = endpoints.get(index);
- endpoint.inflight.incrementAndGet();
- try {
- Request.Builder okRequest = new Request.Builder().method(request.method(),
- RequestBody.create(request.body(),
- MediaType.parse("application/json")))
- .url(endpoint.uri.resolve(request.path()).toString());
- request.headers().forEach((name, value) -> okRequest.header(name, value.get()));
- endpoint.client.newCall(okRequest.build()).enqueue(new Callback() {
- @Override public void onFailure(@NotNull Call call, @NotNull IOException e) {
- vessel.completeExceptionally(e);
- }
- @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
- vessel.complete(HttpResponse.of(response.code(), response.body().bytes()));
- }
- });
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
- }
-
-
- @Override
- public void close() {
- Throwable thrown = null;
- for (Endpoint endpoint : endpoints)
- try {
- //endpoint.client.
- }
- catch (Throwable t) {
- if (thrown == null) thrown = t;
- else thrown.addSuppressed(t);
- }
- if (thrown != null) throw new RuntimeException(thrown);
- }
-
-
- private static class Endpoint {
-
- private final OkHttpClient client;
- private final AtomicInteger inflight = new AtomicInteger(0);
- private final URI uri;
-
- private Endpoint(OkHttpClient client, URI uri) {
- this.client = client;
- this.uri = uri;
- }
- }
-
-}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
index 6b39d9053b4..9b30ebfd0aa 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
@@ -48,7 +48,7 @@ class GracePeriodCircuitBreakerTest {
breaker.failure();
now.addAndGet(60 * SECOND);
- assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passedd");
+ assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed");
now.addAndGet(1);
assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");