From 7d5a34c4fb5d9a1a4c85a82bbab47be9dc2c8125 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 14 Jun 2021 13:10:44 +0200 Subject: Remove jetty and okhttp impementations --- vespa-feed-client/pom.xml | 17 --- .../java/ai/vespa/feed/client/JettyCluster.java | 121 -------------------- .../main/java/ai/vespa/feed/client/OkCluster.java | 127 --------------------- 3 files changed, 265 deletions(-) delete mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java delete mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index b0d12431c05..7d4938c6fb0 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -29,28 +29,11 @@ bcpkix-jdk15on compile - - org.eclipse.jetty.http2 - http2-http-client-transport - ${jetty.version} - compile - - - org.eclipse.jetty - jetty-client - ${jetty.version} - compile - org.apache.httpcomponents.client5 httpclient5 compile - - com.squareup.okhttp3 - okhttp - 4.9.1 - com.fasterxml.jackson.core jackson-core diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java deleted file mode 100644 index 56be53798b1..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.http.HttpField; -import org.eclipse.jetty.http2.client.HTTP2Client; -import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; -import org.eclipse.jetty.util.ssl.SslContextFactory; - -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jonmv - */ -class JettyCluster implements Cluster { - - private final List endpoints = new ArrayList<>(); - - JettyCluster(FeedClientBuilder builder) { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); - } - - private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { - try { - SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); - clientSslCtxFactory.setSslContext(builder.constructSslContext()); - clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); - - HTTP2Client wrapped = new HTTP2Client(); - wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); - HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); - HttpClient client = new HttpClient(transport, clientSslCtxFactory); - client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); - client.setFollowRedirects(false); - client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection); - client.setMaxConnectionsPerDestination(1); - - client.start(); - return client; - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void dispatch(HttpRequest request, CompletableFuture vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - Request jettyRequest = endpoint.client.newRequest(endpoint.uri.resolve(request.path())) - .method(request.method()) - .timeout(5, TimeUnit.MINUTES) - .content(request.body() == null ? null : new BytesContentProvider("application/json", request.body())); - request.headers().forEach((name, value) -> jettyRequest.header(name, value.get())); - jettyRequest.send(new BufferingResponseListener() { - @Override public void onComplete(Result result) { - if (result.isSucceeded()) - vessel.complete(HttpResponse.of(result.getResponse().getStatus(), - getContent())); - else - vessel.completeExceptionally(result.getFailure()); - } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - endpoint.client.stop(); - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final HttpClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI uri; - - private Endpoint(HttpClient client, URI uri) { - this.client = client; - this.uri = uri; - } - - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java deleted file mode 100644 index 62161bb9a33..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import org.jetbrains.annotations.NotNull; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.X509ExtendedTrustManager; -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jonmv - */ -public class OkCluster implements Cluster { - - private final List endpoints = new ArrayList<>(); - - OkCluster(FeedClientBuilder builder) { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint)); - } - - private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) { - try { - return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS) - .callTimeout(5, TimeUnit.MINUTES) - .readTimeout(30, TimeUnit.SECONDS) - .writeTimeout(30, TimeUnit.SECONDS) - .followRedirects(false) - //.hostnameVerifier(builder.hostnameVerifier) - .retryOnConnectionFailure(false) - .sslSocketFactory(builder.constructSslContext().getSocketFactory(), - new X509ExtendedTrustManager() { - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }) - .build(); - - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void dispatch(HttpRequest request, CompletableFuture vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - Request.Builder okRequest = new Request.Builder().method(request.method(), - RequestBody.create(request.body(), - MediaType.parse("application/json"))) - .url(endpoint.uri.resolve(request.path()).toString()); - request.headers().forEach((name, value) -> okRequest.header(name, value.get())); - endpoint.client.newCall(okRequest.build()).enqueue(new Callback() { - @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { - vessel.completeExceptionally(e); - } - @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { - vessel.complete(HttpResponse.of(response.code(), response.body().bytes())); - } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - //endpoint.client. - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final OkHttpClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI uri; - - private Endpoint(OkHttpClient client, URI uri) { - this.client = client; - this.uri = uri; - } - } - -} -- cgit v1.2.3 From ac66e1dc39e30938a91b26eda3d9b715dd3925a5 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Mon, 14 Jun 2021 13:11:01 +0200 Subject: Make it configurable wheethere to benchmark client --- .../src/main/java/ai/vespa/feed/client/FeedClientBuilder.java | 7 +++++++ .../src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index e0418836c80..8b5eb9efea7 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -44,6 +44,7 @@ public class FeedClientBuilder { Collection certificate; PrivateKey privateKey; Collection caCertificates; + boolean benchmark; /** Creates a builder for a single container endpoint **/ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } @@ -100,6 +101,12 @@ public class FeedClientBuilder { return this; } + /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */ + public FeedClientBuilder setBenchmarkOn(boolean on) { + this.benchmark = on; + return this; + } + /** Adds HTTP request header to all client requests. */ public FeedClientBuilder addRequestHeader(String name, String value) { return addRequestHeader(name, () -> requireNonNull(value)); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 3cce423735f..6b2aec5d8b3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,11 +59,11 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); + this(builder, new ApacheCluster(builder)); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { - this.cluster = cluster; + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; -- cgit v1.2.3 From e0fabae923b0343f2a30767e60b9a26458fe0c84 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Tue, 15 Jun 2021 14:47:06 +0200 Subject: Update abi spec --- .../java/com/yahoo/messagebus/StaticThrottlePolicy.java | 2 +- vespa-feed-client/abi-spec.json | 15 +-------------- .../vespa/feed/client/GracePeriodCircuitBreakerTest.java | 2 +- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java index aa1bc1ce624..52b92737bb9 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/StaticThrottlePolicy.java @@ -7,7 +7,7 @@ package com.yahoo.messagebus; * way of {@link #setMaxPendingCount(int)}), the total size of pending messages (by way of {@link * #setMaxPendingSize(long)}), or some combination thereof. * - * NOTE: By context, "pending" is refering to the number of sent messages that have not been replied to yet. + * NOTE: By context, "pending" refers to the number of sent messages that have not been replied to yet. * * @author Simon Thoresen Hult */ diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json index 06da776b6c6..db9c1ff1a02 100644 --- a/vespa-feed-client/abi-spec.json +++ b/vespa-feed-client/abi-spec.json @@ -136,6 +136,7 @@ "public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)", "public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)", "public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)", + "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)", "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)", "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)", "public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)", @@ -236,20 +237,6 @@ ], "fields": [] }, - "ai.vespa.feed.client.OkCluster": { - "superClass": "java.lang.Object", - "interfaces": [ - "ai.vespa.feed.client.Cluster" - ], - "attributes": [ - "public" - ], - "methods": [ - "public void dispatch(ai.vespa.feed.client.HttpRequest, java.util.concurrent.CompletableFuture)", - "public void close()" - ], - "fields": [] - }, "ai.vespa.feed.client.OperationParameters": { "superClass": "java.lang.Object", "interfaces": [], diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java index 6b39d9053b4..9b30ebfd0aa 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java @@ -48,7 +48,7 @@ class GracePeriodCircuitBreakerTest { breaker.failure(); now.addAndGet(60 * SECOND); - assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passedd"); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); now.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); -- cgit v1.2.3