diff options
author | Arnstein Ressem <aressem@gmail.com> | 2021-06-14 10:11:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-14 10:11:13 +0200 |
commit | d2c1583d12bb28df98eb8e62dd6553866c1acc3b (patch) | |
tree | b9067fa40a21a64ead7e249cfe04b59d57dd2da0 /vespa-feed-client | |
parent | 67412c3c00fae41f1ce24b9e47a24c41128475f2 (diff) |
Revert "Jonmv/vespa feed client"
Diffstat (limited to 'vespa-feed-client')
8 files changed, 27 insertions, 177 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 85377c25241..02d4a0128ea 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -42,11 +42,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>com.squareup.okhttp3</groupId> - <artifactId>okhttp</artifactId> - <version>4.9.1</version> - </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index 39d343515fe..d4cd53daecc 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -22,9 +22,6 @@ public interface FeedClient extends Closeable { /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ OperationStats stats(); - /** Current state of the circuit breaker. */ - default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; } - /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ void close(boolean graceful); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index b160cced4b9..256d3ae535c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -63,11 +63,6 @@ class HttpFeedClient implements FeedClient { } @Override - public CircuitBreaker.State circuitBreakerState() { - return requestStrategy.circuitBreakerState(); - } - - @Override public void close(boolean graceful) { closed.set(true); if (graceful) @@ -76,7 +71,17 @@ class HttpFeedClient implements FeedClient { requestStrategy.destroy(); } + private void ensureOpen() { + if (requestStrategy.hasFailed()) + close(); + + if (closed.get()) + throw new IllegalStateException("Client is closed, no further operations may be sent"); + } + private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { + ensureOpen(); + HttpRequest request = new HttpRequest(method, getPath(documentId) + getQuery(params), requestHeaders, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 72fa675ecf1..5646d37cde3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new OkCluster(builder))); + this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { @@ -75,22 +75,16 @@ class HttpRequestStrategy implements RequestStrategy { dispatcher.start(); } - @Override public OperationStats stats() { return cluster.stats(); } - @Override - public CircuitBreaker.State circuitBreakerState() { - return breaker.state(); - } - private void dispatch() { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } catch (InterruptedException e) { @@ -194,6 +188,11 @@ class HttpRequestStrategy implements RequestStrategy { inflight.decrementAndGet(); } + @Override + public boolean hasFailed() { + return breaker.state() == OPEN; + } + public void await() { try { while (inflight.get() > 0) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java index 56be53798b1..dc889d29d36 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java @@ -28,8 +28,7 @@ class JettyCluster implements Cluster { JettyCluster(FeedClientBuilder builder) { for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); + endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); } private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { @@ -39,13 +38,17 @@ class JettyCluster implements Cluster { clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); HTTP2Client wrapped = new HTTP2Client(); + wrapped.setSelectors(8); wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); HttpClient client = new HttpClient(transport, clientSslCtxFactory); client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); + client.setDefaultRequestContentType("application/json"); client.setFollowRedirects(false); - client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection); - client.setMaxConnectionsPerDestination(1); + client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection); + client.setIdleTimeout(10000); + client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint); + client.setRequestBufferSize(1 << 16); client.start(); return client; @@ -117,5 +120,4 @@ class JettyCluster implements Cluster { } } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java deleted file mode 100644 index 4b0d4b7c5aa..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.ConnectionPool; -import okhttp3.ConnectionSpec; -import okhttp3.MediaType; -import okhttp3.OkHttp; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.Response; -import okhttp3.internal.concurrent.TaskRunner; -import okhttp3.internal.connection.RealConnectionPool; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.HttpClientTransport; -import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.client.util.BufferingResponseListener; -import org.eclipse.jetty.client.util.BytesContentProvider; -import org.eclipse.jetty.http.HttpField; -import org.eclipse.jetty.http2.client.HTTP2Client; -import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; -import org.eclipse.jetty.util.ssl.SslContextFactory; -import org.jetbrains.annotations.NotNull; -import sun.security.ssl.SSLSocketFactoryImpl; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509ExtendedTrustManager; -import java.io.IOException; -import java.net.Socket; -import java.net.URI; -import java.security.GeneralSecurityException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * @author jonmv - */ -public class OkCluster implements Cluster { - - private final List<Endpoint> endpoints = new ArrayList<>(); - - OkCluster(FeedClientBuilder builder) { - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint)); - } - - private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) { - try { - return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS) - .callTimeout(5, TimeUnit.MINUTES) - .readTimeout(30, TimeUnit.SECONDS) - .writeTimeout(30, TimeUnit.SECONDS) - .followRedirects(false) - //.hostnameVerifier(builder.hostnameVerifier) - .retryOnConnectionFailure(false) - .sslSocketFactory(builder.constructSslContext().getSocketFactory(), - new X509ExtendedTrustManager() { - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { } - @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { } - @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } }) - .build(); - - } - catch (Exception e) { - throw new IllegalStateException(e); - } - } - - - @Override - public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - Request.Builder okRequest = new Request.Builder().method(request.method(), - RequestBody.create(request.body(), - MediaType.parse("application/json"))) - .url(endpoint.uri.resolve(request.path()).toString()); - request.headers().forEach((name, value) -> okRequest.header(name, value.get())); - endpoint.client.newCall(okRequest.build()).enqueue(new Callback() { - @Override public void onFailure(@NotNull Call call, @NotNull IOException e) { - vessel.completeExceptionally(e); - } - @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { - vessel.complete(HttpResponse.of(response.code(), response.body().bytes())); - } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - } - - - @Override - public void close() { - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - //endpoint.client. - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final OkHttpClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI uri; - - private Endpoint(OkHttpClient client, URI uri) { - this.client = client; - this.uri = uri; - } - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index a1101eb0ebb..7764dce712b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,8 +1,6 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; - import java.util.concurrent.CompletableFuture; /** @@ -15,8 +13,8 @@ interface RequestStrategy { /** Stats for operations sent through this. */ OperationStats stats(); - /** State of the circuit breaker. */ - State circuitBreakerState(); + /** Whether this has failed fatally, and we should cease sending further operations. */ + boolean hasFailed(); /** Forcibly terminates this, causing all inflight operations to complete immediately. */ void destroy(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index d8090549420..0605ed5bf65 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -25,7 +25,7 @@ class HttpFeedClientTest { AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } - @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; } + @Override public boolean hasFailed() { return false; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } |