summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorArnstein Ressem <aressem@gmail.com>2021-06-14 10:11:13 +0200
committerGitHub <noreply@github.com>2021-06-14 10:11:13 +0200
commitd2c1583d12bb28df98eb8e62dd6553866c1acc3b (patch)
treeb9067fa40a21a64ead7e249cfe04b59d57dd2da0 /vespa-feed-client
parent67412c3c00fae41f1ce24b9e47a24c41128475f2 (diff)
Revert "Jonmv/vespa feed client"
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/pom.xml5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java12
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java146
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java6
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java2
8 files changed, 27 insertions, 177 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 85377c25241..02d4a0128ea 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -42,11 +42,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>okhttp</artifactId>
- <version>4.9.1</version>
- </dependency>
- <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index 39d343515fe..d4cd53daecc 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -22,9 +22,6 @@ public interface FeedClient extends Closeable {
/** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
OperationStats stats();
- /** Current state of the circuit breaker. */
- default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; }
-
/** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
void close(boolean graceful);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index b160cced4b9..256d3ae535c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -63,11 +63,6 @@ class HttpFeedClient implements FeedClient {
}
@Override
- public CircuitBreaker.State circuitBreakerState() {
- return requestStrategy.circuitBreakerState();
- }
-
- @Override
public void close(boolean graceful) {
closed.set(true);
if (graceful)
@@ -76,7 +71,17 @@ class HttpFeedClient implements FeedClient {
requestStrategy.destroy();
}
+ private void ensureOpen() {
+ if (requestStrategy.hasFailed())
+ close();
+
+ if (closed.get())
+ throw new IllegalStateException("Client is closed, no further operations may be sent");
+ }
+
private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
+ ensureOpen();
+
HttpRequest request = new HttpRequest(method,
getPath(documentId) + getQuery(params),
requestHeaders,
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 72fa675ecf1..5646d37cde3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new OkCluster(builder)));
+ this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
@@ -75,22 +75,16 @@ class HttpRequestStrategy implements RequestStrategy {
dispatcher.start();
}
- @Override
public OperationStats stats() {
return cluster.stats();
}
- @Override
- public CircuitBreaker.State circuitBreakerState() {
- return breaker.state();
- }
-
private void dispatch() {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
- Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open?
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
}
}
catch (InterruptedException e) {
@@ -194,6 +188,11 @@ class HttpRequestStrategy implements RequestStrategy {
inflight.decrementAndGet();
}
+ @Override
+ public boolean hasFailed() {
+ return breaker.state() == OPEN;
+ }
+
public void await() {
try {
while (inflight.get() > 0)
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
index 56be53798b1..dc889d29d36 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
@@ -28,8 +28,7 @@ class JettyCluster implements Cluster {
JettyCluster(FeedClientBuilder builder) {
for (URI endpoint : builder.endpoints)
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
+ endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
}
private static HttpClient createJettyHttpClient(FeedClientBuilder builder) {
@@ -39,13 +38,17 @@ class JettyCluster implements Cluster {
clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier);
HTTP2Client wrapped = new HTTP2Client();
+ wrapped.setSelectors(8);
wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection);
HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped);
HttpClient client = new HttpClient(transport, clientSslCtxFactory);
client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION)));
+ client.setDefaultRequestContentType("application/json");
client.setFollowRedirects(false);
- client.setMaxRequestsQueuedPerDestination(builder.maxStreamsPerConnection);
- client.setMaxConnectionsPerDestination(1);
+ client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection);
+ client.setIdleTimeout(10000);
+ client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint);
+ client.setRequestBufferSize(1 << 16);
client.start();
return client;
@@ -117,5 +120,4 @@ class JettyCluster implements Cluster {
}
}
-
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
deleted file mode 100644
index 4b0d4b7c5aa..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
+++ /dev/null
@@ -1,146 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import okhttp3.Call;
-import okhttp3.Callback;
-import okhttp3.ConnectionPool;
-import okhttp3.ConnectionSpec;
-import okhttp3.MediaType;
-import okhttp3.OkHttp;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import okhttp3.internal.concurrent.TaskRunner;
-import okhttp3.internal.connection.RealConnectionPool;
-import org.eclipse.jetty.client.HttpClient;
-import org.eclipse.jetty.client.HttpClientTransport;
-import org.eclipse.jetty.client.api.Result;
-import org.eclipse.jetty.client.util.BufferingResponseListener;
-import org.eclipse.jetty.client.util.BytesContentProvider;
-import org.eclipse.jetty.http.HttpField;
-import org.eclipse.jetty.http2.client.HTTP2Client;
-import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.jetbrains.annotations.NotNull;
-import sun.security.ssl.SSLSocketFactoryImpl;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509ExtendedTrustManager;
-import java.io.IOException;
-import java.net.Socket;
-import java.net.URI;
-import java.security.GeneralSecurityException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author jonmv
- */
-public class OkCluster implements Cluster {
-
- private final List<Endpoint> endpoints = new ArrayList<>();
-
- OkCluster(FeedClientBuilder builder) {
- for (URI endpoint : builder.endpoints)
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint));
- }
-
- private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) {
- try {
- return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS)
- .callTimeout(5, TimeUnit.MINUTES)
- .readTimeout(30, TimeUnit.SECONDS)
- .writeTimeout(30, TimeUnit.SECONDS)
- .followRedirects(false)
- //.hostnameVerifier(builder.hostnameVerifier)
- .retryOnConnectionFailure(false)
- .sslSocketFactory(builder.constructSslContext().getSocketFactory(),
- new X509ExtendedTrustManager() {
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
- @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
- @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
- @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } })
- .build();
-
- }
- catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
-
- @Override
- public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
- int index = 0;
- int min = Integer.MAX_VALUE;
- for (int i = 0; i < endpoints.size(); i++)
- if (endpoints.get(i).inflight.get() < min) {
- index = i;
- min = endpoints.get(i).inflight.get();
- }
-
- Endpoint endpoint = endpoints.get(index);
- endpoint.inflight.incrementAndGet();
- try {
- Request.Builder okRequest = new Request.Builder().method(request.method(),
- RequestBody.create(request.body(),
- MediaType.parse("application/json")))
- .url(endpoint.uri.resolve(request.path()).toString());
- request.headers().forEach((name, value) -> okRequest.header(name, value.get()));
- endpoint.client.newCall(okRequest.build()).enqueue(new Callback() {
- @Override public void onFailure(@NotNull Call call, @NotNull IOException e) {
- vessel.completeExceptionally(e);
- }
- @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
- vessel.complete(HttpResponse.of(response.code(), response.body().bytes()));
- }
- });
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
- }
-
-
- @Override
- public void close() {
- Throwable thrown = null;
- for (Endpoint endpoint : endpoints)
- try {
- //endpoint.client.
- }
- catch (Throwable t) {
- if (thrown == null) thrown = t;
- else thrown.addSuppressed(t);
- }
- if (thrown != null) throw new RuntimeException(thrown);
- }
-
-
- private static class Endpoint {
-
- private final OkHttpClient client;
- private final AtomicInteger inflight = new AtomicInteger(0);
- private final URI uri;
-
- private Endpoint(OkHttpClient client, URI uri) {
- this.client = client;
- this.uri = uri;
- }
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index a1101eb0ebb..7764dce712b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,8 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import ai.vespa.feed.client.FeedClient.CircuitBreaker.State;
-
import java.util.concurrent.CompletableFuture;
/**
@@ -15,8 +13,8 @@ interface RequestStrategy {
/** Stats for operations sent through this. */
OperationStats stats();
- /** State of the circuit breaker. */
- State circuitBreakerState();
+ /** Whether this has failed fatally, and we should cease sending further operations. */
+ boolean hasFailed();
/** Forcibly terminates this, causing all inflight operations to complete immediately. */
void destroy();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
index d8090549420..0605ed5bf65 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -25,7 +25,7 @@ class HttpFeedClientTest {
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@Override public OperationStats stats() { throw new UnsupportedOperationException(); }
- @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; }
+ @Override public boolean hasFailed() { return false; }
@Override public void destroy() { throw new UnsupportedOperationException(); }
@Override public void await() { throw new UnsupportedOperationException(); }
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }