summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-11 12:07:17 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-11 12:09:48 +0200
commitf24446fc13c290b33b1c596f87f5b8fa82ea3171 (patch)
tree60bd922277508df8d7a72e23e2410810d20e0611 /vespa-feed-client
parent18cbeabfee69af646125f32eae382233c8d244a2 (diff)
Use Jetty for vespa-feed-client
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/pom.xml12
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java1
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java121
4 files changed, 134 insertions, 2 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 7759e9d2308..02d4a0128ea 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -25,6 +25,18 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-http-client-transport</artifactId>
+ <version>${jetty.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-client</artifactId>
+ <version>${jetty.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<scope>compile</scope>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index 3a3e07b0b32..e5d45a2f211 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -150,7 +150,6 @@ class ApacheCluster implements Cluster {
this.wrapped = wrapped;
}
-
@Override
public int code() {
return wrapped.getCode();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 5646d37cde3..f9fc7544501 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy {
});
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
- this(builder, new BenchmarkingCluster(new ApacheCluster(builder)));
+ this(builder, new BenchmarkingCluster(new JettyCluster(builder)));
}
HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
new file mode 100644
index 00000000000..ec1b599f1ae
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java
@@ -0,0 +1,121 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpClientTransport;
+import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.client.util.BufferingResponseListener;
+import org.eclipse.jetty.client.util.BytesContentProvider;
+import org.eclipse.jetty.http.HttpField;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author jonmv
+ */
+class JettyCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ JettyCluster(FeedClientBuilder builder) {
+ for (URI endpoint : builder.endpoints)
+ endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint));
+ }
+
+ private static HttpClient createJettyHttpClient(FeedClientBuilder builder) {
+ try {
+ SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client();
+ clientSslCtxFactory.setSslContext(new SslContextBuilder().withCaCertificates(builder.caCertificates).build());
+ clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier);
+
+ HTTP2Client wrapped = new HTTP2Client();
+ wrapped.setSelectors(8);
+ wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection);
+ HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped);
+ HttpClient client = new HttpClient(transport, clientSslCtxFactory);
+ client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION)));
+ client.setDefaultRequestContentType("application/json");
+ client.setFollowRedirects(false);
+ client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection);
+ client.setIdleTimeout(10000);
+ client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint);
+ client.setRequestBufferSize(1 << 16);
+
+ client.start();
+ return client;
+ }
+ catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ endpoint.client.newRequest(endpoint.uri.resolve(request.path()))
+ .method(request.method())
+ .timeout(5, TimeUnit.MINUTES)
+ .content(request.body() == null ? null : new BytesContentProvider(request.body()))
+ .send(new BufferingResponseListener() {
+ @Override public void onComplete(Result result) {
+ if (result.isSucceeded())
+ vessel.complete(HttpResponse.of(result.getResponse().getStatus(),
+ getContent()));
+ else
+ vessel.completeExceptionally(result.getFailure());
+ }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ endpoint.client.stop();
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final HttpClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI uri;
+
+ private Endpoint(HttpClient client, URI uri) {
+ this.client = client;
+ this.uri = uri;
+ }
+
+ }
+}