summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-14 10:27:45 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-14 10:27:45 +0200
commita53d59c04cb34535dac02bd47a14c16c731d7494 (patch)
tree01b03a667dbf616bba7b817fe6ce56c21dd38a2a /vespa-feed-client
parent7024887443e9ffb113c63bdbe6ff284be779e293 (diff)
Revert "Merge pull request #18232 from vespa-engine/revert-18231-jonmv/vespa-feed-client"
This reverts commit eb8b2a1d15596df8487ff855934297152fee5e92, reversing changes made to 67412c3c00fae41f1ce24b9e47a24c41128475f2.
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/pom.xml5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java12
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java146
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java6
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java2
8 files changed, 177 insertions, 27 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 02d4a0128ea..85377c25241 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -42,6 +42,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>4.9.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index d4cd53daecc..39d343515fe 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -22,6 +22,9 @@ public interface FeedClient extends Closeable {
/** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
OperationStats stats();
+ /** Current state of the circuit breaker. */
+ default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; }
+
/** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
void close(boolean graceful);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 256d3ae535c..b160cced4b9 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -63,6 +63,11 @@ class HttpFeedClient implements FeedClient {
}
@Override
+ public CircuitBreaker.State circuitBreakerState() {
+ return requestStrategy.circuitBreakerState();
+ }
+
+ @Override
public void close(boolean graceful) {
closed.set(true);
if (graceful)
@@ -71,17 +76,7 @@ class HttpFeedClient implements FeedClient {
requestStrategy.destroy();
}
- private void ensureOpen() {
- if (requestStrategy.hasFailed())
- close();
-
- if (closed.get())
- throw new IllegalStateException("Client is closed, no further operations may be sent");
- }
-
private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
- ensureOpen();
-
HttpRequest request = new HttpRequest(method,
getPath(documentId) + getQuery(params),
requestHeaders,
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 5646d37cde3..72fa675ecf1 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
+ this(builder, new BenchmarkingCluster(new OkCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
@@ -75,16 +75,22 @@ class HttpRequestStrategy implements RequestStrategy {
dispatcher.start();
}
+ @Override
public OperationStats stats() {
return cluster.stats();
}
+ @Override
+ public CircuitBreaker.State circuitBreakerState() {
+ return breaker.state();
+ }
+
private void dispatch() {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
- Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open?
}
}
catch (InterruptedException e) {
@@ -188,11 +194,6 @@ class HttpRequestStrategy implements RequestStrategy {
inflight.decrementAndGet();
}
- @Override
- public boolean hasFailed() {
- return breaker.state() == OPEN;
- }
-
public void await() {
try {
while (inflight.get() > 0)
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
index dc889d29d36..56be53798b1 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
@@ -28,7 +28,8 @@ class JettyCluster implements Cluster {
JettyCluster(FeedClientBuilder builder) {
for (URI endpoint : builder.endpoints)
- endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
}
private static HttpClient createJettyHttpClient(FeedClientBuilder builder) {
@@ -38,17 +39,13 @@ class JettyCluster implements Cluster {
clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier);
HTTP2Client wrapped = new HTTP2Client();
- wrapped.setSelectors(8);
wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection);
HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped);
HttpClient client = new HttpClient(transport, clientSslCtxFactory);
client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION)));
- client.setDefaultRequestContentType("application/json");
client.setFollowRedirects(false);
- client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection);
- client.setIdleTimeout(10000);
- client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint);
- client.setRequestBufferSize(1 << 16);
+ client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection);
+ client.setMaxConnectionsPerDestination(1);
client.start();
return client;
@@ -120,4 +117,5 @@ class JettyCluster implements Cluster {
}
}
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
new file mode 100644
index 00000000000..4b0d4b7c5aa
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
@@ -0,0 +1,146 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.ConnectionPool;
+import okhttp3.ConnectionSpec;
+import okhttp3.MediaType;
+import okhttp3.OkHttp;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.internal.concurrent.TaskRunner;
+import okhttp3.internal.connection.RealConnectionPool;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.jetbrains.annotations.NotNull;
+import sun.security.ssl.SSLSocketFactoryImpl;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.security.GeneralSecurityException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author jonmv
+ */
+public class OkCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ OkCluster(FeedClientBuilder builder) {
+ for (URI endpoint : builder.endpoints)
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint));
+ }
+
+ private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) {
+ try {
+ return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS)
+ .callTimeout(5, TimeUnit.MINUTES)
+ .readTimeout(30, TimeUnit.SECONDS)
+ .writeTimeout(30, TimeUnit.SECONDS)
+ .followRedirects(false)
+ //.hostnameVerifier(builder.hostnameVerifier)
+ .retryOnConnectionFailure(false)
+ .sslSocketFactory(builder.constructSslContext().getSocketFactory(),
+ new X509ExtendedTrustManager() {
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
+ @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } })
+ .build();
+
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ Request.Builder okRequest = new Request.Builder().method(request.method(),
+ RequestBody.create(request.body(),
+ MediaType.parse("application/json")))
+ .url(endpoint.uri.resolve(request.path()).toString());
+ request.headers().forEach((name, value) -> okRequest.header(name, value.get()));
+ endpoint.client.newCall(okRequest.build()).enqueue(new Callback() {
+ @Override public void onFailure(@NotNull Call call, @NotNull IOException e) {
+ vessel.completeExceptionally(e);
+ }
+ @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
+ vessel.complete(HttpResponse.of(response.code(), response.body().bytes()));
+ }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ //endpoint.client.
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final OkHttpClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI uri;
+
+ private Endpoint(OkHttpClient client, URI uri) {
+ this.client = client;
+ this.uri = uri;
+ }
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index 7764dce712b..a1101eb0ebb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,6 +1,8 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker.State;
+
import java.util.concurrent.CompletableFuture;
/**
@@ -13,8 +15,8 @@ interface RequestStrategy {
/** Stats for operations sent through this. */
OperationStats stats();
- /** Whether this has failed fatally, and we should cease sending further operations. */
- boolean hasFailed();
+ /** State of the circuit breaker. */
+ State circuitBreakerState();
/** Forcibly terminates this, causing all inflight operations to complete immediately. */
void destroy();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
index 0605ed5bf65..d8090549420 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -25,7 +25,7 @@ class HttpFeedClientTest {
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@Override public OperationStats stats() { throw new UnsupportedOperationException(); }
- @Override public boolean hasFailed() { return false; }
+ @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; }
@Override public void destroy() { throw new UnsupportedOperationException(); }
@Override public void await() { throw new UnsupportedOperationException(); }
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }