diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-11 12:07:17 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-11 12:09:48 +0200 |
commit | f24446fc13c290b33b1c596f87f5b8fa82ea3171 (patch) | |
tree | 60bd922277508df8d7a72e23e2410810d20e0611 /vespa-feed-client | |
parent | 18cbeabfee69af646125f32eae382233c8d244a2 (diff) |
Use Jetty for vespa-feed-client
Diffstat (limited to 'vespa-feed-client')
4 files changed, 134 insertions, 2 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 7759e9d2308..02d4a0128ea 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -25,6 +25,18 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-http-client-transport</artifactId> + <version>${jetty.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-client</artifactId> + <version>${jetty.version}</version> + <scope>compile</scope> + </dependency> + <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index 3a3e07b0b32..e5d45a2f211 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -150,7 +150,6 @@ class ApacheCluster implements Cluster { this.wrapped = wrapped; } - @Override public int code() { return wrapped.getCode(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 5646d37cde3..f9fc7544501 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -59,7 +59,7 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { - this(builder, new BenchmarkingCluster(new ApacheCluster(builder))); + this(builder, new BenchmarkingCluster(new JettyCluster(builder))); } HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java new file mode 100644 index 00000000000..ec1b599f1ae --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java @@ -0,0 +1,121 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author jonmv + */ +class JettyCluster implements Cluster { + + private final List<Endpoint> endpoints = new ArrayList<>(); + + JettyCluster(FeedClientBuilder builder) { + for (URI endpoint : builder.endpoints) + endpoints.add(new Endpoint(createJettyHttpClient(builder), endpoint)); + } + + private static HttpClient createJettyHttpClient(FeedClientBuilder builder) { + try { + SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); + clientSslCtxFactory.setSslContext(new SslContextBuilder().withCaCertificates(builder.caCertificates).build()); + clientSslCtxFactory.setHostnameVerifier(builder.hostnameVerifier); + + HTTP2Client wrapped = new HTTP2Client(); + wrapped.setSelectors(8); + wrapped.setMaxConcurrentPushedStreams(builder.maxStreamsPerConnection); + HttpClientTransport transport = new HttpClientTransportOverHTTP2(wrapped); + HttpClient client = new HttpClient(transport, clientSslCtxFactory); + client.setUserAgentField(new HttpField("User-Agent", String.format("vespa-feed-client/%s", Vespa.VERSION))); + client.setDefaultRequestContentType("application/json"); + client.setFollowRedirects(false); + client.setMaxRequestsQueuedPerDestination(builder.connectionsPerEndpoint * builder.maxStreamsPerConnection); + client.setIdleTimeout(10000); + client.setMaxConnectionsPerDestination(builder.connectionsPerEndpoint); + client.setRequestBufferSize(1 << 16); + + client.start(); + return client; + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + endpoint.client.newRequest(endpoint.uri.resolve(request.path())) + .method(request.method()) + .timeout(5, TimeUnit.MINUTES) + .content(request.body() == null ? null : new BytesContentProvider(request.body())) + .send(new BufferingResponseListener() { + @Override public void onComplete(Result result) { + if (result.isSucceeded()) + vessel.complete(HttpResponse.of(result.getResponse().getStatus(), + getContent())); + else + vessel.completeExceptionally(result.getFailure()); + } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.stop(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final HttpClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI uri; + + private Endpoint(HttpClient client, URI uri) { + this.client = client; + this.uri = uri; + } + + } +} |