summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-14 09:32:29 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-14 09:33:09 +0200
commit165d2496f311d99846a9b3a821a2406341545c86 (patch)
treef47fdadad7f25f188878126eac53c3a8fa6b82e9 /vespa-feed-client
parentd60e4a6842add6cc04ac1782f54a909a07f33523 (diff)
Support OKHttp
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/pom.xml5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java146
2 files changed, 151 insertions, 0 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 02d4a0128ea..85377c25241 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -42,6 +42,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>4.9.1</version>
+ </dependency>
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
new file mode 100644
index 00000000000..4b0d4b7c5aa
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OkCluster.java
@@ -0,0 +1,146 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import okhttp3.Call;
+import okhttp3.Callback;
+import okhttp3.ConnectionPool;
+import okhttp3.ConnectionSpec;
+import okhttp3.MediaType;
+import okhttp3.OkHttp;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import okhttp3.internal.concurrent.TaskRunner;
+import okhttp3.internal.connection.RealConnectionPool;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.jetbrains.annotations.NotNull;
+import sun.security.ssl.SSLSocketFactoryImpl;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedTrustManager;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.security.GeneralSecurityException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author jonmv
+ */
+public class OkCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ OkCluster(FeedClientBuilder builder) {
+ for (URI endpoint : builder.endpoints)
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createOkHttpClient(builder), endpoint));
+ }
+
+ private static OkHttpClient createOkHttpClient(FeedClientBuilder builder) {
+ try {
+ return new OkHttpClient.Builder().connectTimeout(15, TimeUnit.SECONDS)
+ .callTimeout(5, TimeUnit.MINUTES)
+ .readTimeout(30, TimeUnit.SECONDS)
+ .writeTimeout(30, TimeUnit.SECONDS)
+ .followRedirects(false)
+ //.hostnameVerifier(builder.hostnameVerifier)
+ .retryOnConnectionFailure(false)
+ .sslSocketFactory(builder.constructSslContext().getSocketFactory(),
+ new X509ExtendedTrustManager() {
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) throws CertificateException { }
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) throws CertificateException { }
+ @Override public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
+ @Override public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { }
+ @Override public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; } })
+ .build();
+
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ Request.Builder okRequest = new Request.Builder().method(request.method(),
+ RequestBody.create(request.body(),
+ MediaType.parse("application/json")))
+ .url(endpoint.uri.resolve(request.path()).toString());
+ request.headers().forEach((name, value) -> okRequest.header(name, value.get()));
+ endpoint.client.newCall(okRequest.build()).enqueue(new Callback() {
+ @Override public void onFailure(@NotNull Call call, @NotNull IOException e) {
+ vessel.completeExceptionally(e);
+ }
+ @Override public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
+ vessel.complete(HttpResponse.of(response.code(), response.body().bytes()));
+ }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ //endpoint.client.
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final OkHttpClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI uri;
+
+ private Endpoint(OkHttpClient client, URI uri) {
+ this.client = client;
+ this.uri = uri;
+ }
+ }
+
+}