diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-14 10:27:45 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-14 10:27:45 +0200 |
commit | a53d59c04cb34535dac02bd47a14c16c731d7494 (patch) | |
tree | 01b03a667dbf616bba7b817fe6ce56c21dd38a2a /vespa-feed-client | |
parent | 7024887443e9ffb113c63bdbe6ff284be779e293 (diff) |
Revert "Merge pull request #18232 from vespa-engine/revert-18231-jonmv/vespa-feed-client"
This reverts commit eb8b2a1d15596df8487ff855934297152fee5e92, reversing
changes made to 67412c3c00fae41f1ce24b9e47a24c41128475f2.
Diffstat (limited to 'vespa-feed-client')
8 files changed, 177 insertions, 27 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 02d4a0128ea..85377c25241 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -42,6 +42,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + <version>4.9.1</version> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index d4cd53daecc..39d343515fe 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -22,6 +22,9 @@ public interface FeedClient extends Closeable { /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ OperationStats stats(); + /** Current state of the circuit breaker. */ + default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; } + /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ void close(boolean graceful); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 256d3ae535c..b160cced4b9 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -63,6 +63,11 @@ class HttpFeedClient implements FeedClient { } @Override + public CircuitBreaker.State circuitBreakerState() { + return requestStrategy.circuitBreakerState(); + } + + @Override public void close(boolean graceful) { closed.set(true); if (graceful) @@ -71,17 +76,7 @@ class HttpFeedClient implements FeedClient { requestStrategy.destroy(); } - private void ensureOpen() { - if (requestStrategy.hasFailed()) - close(); - - if (closed.get()) - throw new IllegalStateException("Client is closed, no further operations may be sent"); - } - private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { - ensureOpen(); - HttpRequest request = new HttpRequest(method, getPath(documentId) + getQuery(params), requestHeaders, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 5646d37cde3..72fa675ecf1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); + this(builder, new BenchmarkingCluster(new OkCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { @@ -75,16 +75,22 @@ class HttpRequestStrategy implements RequestStrategy { dispatcher.start(); } + @Override public OperationStats stats() { return cluster.stats(); } + @Override + public CircuitBreaker.State circuitBreakerState() { + return breaker.state(); + } + private void dispatch() { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? } } catch (InterruptedException e) { @@ -188,11 +194,6 @@ class HttpRequestStrategy implements RequestStrategy { inflight.decrementAndGet(); } - @Override - public boolean hasFailed() { - return breaker.state() == OPEN; - } - public void await() { try { while (inflight.get() > 0) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java index dc889d29d36..56be53798b1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java @@ -28,7 +28,8 @@ class JettyCluster implements Cluster { JettyCluster(FeedClientBuilder builder) { for (URI endpoint : builder.endpoints) - endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); } private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { @@ -38,17 +39,13 @@ class JettyCluster implements Cluster { clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); HTTP2Client wrapped = new HTTP2Client(); - wrapped.setSelectors(8); wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); HttpClient client = new HttpClient(transport, clientSslCtxFactory); client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); - client.setDefaultRequestContentType("application/json"); client.setFollowRedirects(false); - client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection); - client.setIdleTimeout(10000); - client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint); - client.setRequestBufferSize(1 << 16); + client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection); + client.setMaxConnectionsPerDestination(1); client.start(); return client; @@ -120,4 +117,5 @@ class JettyCluster implements Cluster { } } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java new file mode 100644 index 00000000000..4b0d4b7c5aa --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java @@ -0,0 +1,146 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import okhttp3.Call; +import okhttp3.Callback; +import okhttp3.ConnectionPool; +import okhttp3.ConnectionSpec; +import okhttp3.MediaType; +import okhttp3.OkHttp; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.internal.concurrent.TaskRunner; +import okhttp3.internal.connection.RealConnectionPool; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.jetbrains.annotations.NotNull; +import sun.security.ssl.SSLSocketFactoryImpl; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509ExtendedTrustManager; +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.security.GeneralSecurityException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jonmv + */ +public class OkCluster implements Cluster { + + private final List<Endpoint> endpoints = new ArrayList<>(); + + OkCluster(FeedClientBuilder builder) { + for (URI endpoint : builder.endpoints) + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint)); + } + + private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) { + try { + return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS) + .callTimeout(5, TimeUnit.MINUTES) + .readTimeout(30, TimeUnit.SECONDS) + .writeTimeout(30, TimeUnit.SECONDS) + .followRedirects(false) + //.hostnameVerifier(builder.hostnameVerifier) + .retryOnConnectionFailure(false) + .sslSocketFactory(builder.constructSslContext().getSocketFactory(), + new X509ExtendedTrustManager() { + @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } + @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } + @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } + @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } + @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } + @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } + @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }) + .build(); + + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + Request.Builder okRequest = new Request.Builder().method(request.method(), + RequestBody.create(request.body(), + MediaType.parse("application/json"))) + .url(endpoint.uri.resolve(request.path()).toString()); + request.headers().forEach((name, value) -> okRequest.header(name, value.get())); + endpoint.client.newCall(okRequest.build()).enqueue(new Callback() { + @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { + vessel.completeExceptionally(e); + } + @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { + vessel.complete(HttpResponse.of(response.code(), response.body().bytes())); + } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + //endpoint.client. + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final OkHttpClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI uri; + + private Endpoint(OkHttpClient client, URI uri) { + this.client = client; + this.uri = uri; + } + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index 7764dce712b..a1101eb0ebb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,6 +1,8 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; + import java.util.concurrent.CompletableFuture; /** @@ -13,8 +15,8 @@ interface RequestStrategy { /** Stats for operations sent through this. */ OperationStats stats(); - /** Whether this has failed fatally, and we should cease sending further operations. */ - boolean hasFailed(); + /** State of the circuit breaker. */ + State circuitBreakerState(); /** Forcibly terminates this, causing all inflight operations to complete immediately. */ void destroy(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index 0605ed5bf65..d8090549420 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -25,7 +25,7 @@ class HttpFeedClientTest { AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } - @Override public boolean hasFailed() { return false; } + @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } |