From 321794ad856f2de8b71a95258db85b15f8653419 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 5 Jul 2023 12:22:04 +0200 Subject: Introduce experimental HTTP/2 client implementation based on Jetty --- cloud-tenant-base-dependencies-enforcer/pom.xml | 3 + parent/pom.xml | 5 + .../allowed-maven-dependencies.txt | 3 + vespa-feed-client/pom.xml | 5 + .../ai/vespa/feed/client/impl/JettyCluster.java | 245 +++++++++++++++++++++ 5 files changed, 261 insertions(+) create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java diff --git a/cloud-tenant-base-dependencies-enforcer/pom.xml b/cloud-tenant-base-dependencies-enforcer/pom.xml index cc4fcecd755..85805da76a5 100644 --- a/cloud-tenant-base-dependencies-enforcer/pom.xml +++ b/cloud-tenant-base-dependencies-enforcer/pom.xml @@ -204,11 +204,14 @@ org.bouncycastle:bcpkix-jdk18on:${bouncycastle.version}:test org.bouncycastle:bcprov-jdk18on:${bouncycastle.version}:test org.bouncycastle:bcutil-jdk18on:${bouncycastle.version}:test + org.eclipse.jetty.http2:http2-client:${jetty.version}:test org.eclipse.jetty.http2:http2-common:${jetty.version}:test org.eclipse.jetty.http2:http2-hpack:${jetty.version}:test + org.eclipse.jetty.http2:http2-http-client-transport:${jetty.version}:test org.eclipse.jetty.http2:http2-server:${jetty.version}:test org.eclipse.jetty.toolchain:jetty-jakarta-servlet-api:5.0.2:test org.eclipse.jetty:jetty-alpn-client:${jetty.version}:test + org.eclipse.jetty:jetty-alpn-java-client:${jetty.version}:test org.eclipse.jetty:jetty-alpn-java-server:${jetty.version}:test org.eclipse.jetty:jetty-alpn-server:${jetty.version}:test org.eclipse.jetty:jetty-client:${jetty.version}:test diff --git a/parent/pom.xml b/parent/pom.xml index b58f129ccd1..297b0f78cf6 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -912,6 +912,11 @@ http2-common ${jetty.version} + + org.eclipse.jetty.http2 + http2-http-client-transport + ${jetty.version} + org.eclipse.jetty.http2 http2-server diff --git a/vespa-dependencies-enforcer/allowed-maven-dependencies.txt b/vespa-dependencies-enforcer/allowed-maven-dependencies.txt index 4f78e81bf8a..a66aec154d9 100644 --- a/vespa-dependencies-enforcer/allowed-maven-dependencies.txt +++ b/vespa-dependencies-enforcer/allowed-maven-dependencies.txt @@ -146,6 +146,7 @@ org.codehaus.plexus:plexus-utils:3.3.1 org.eclipse.collections:eclipse-collections:11.0.0 org.eclipse.collections:eclipse-collections-api:11.0.0 org.eclipse.jetty:jetty-alpn-client:11.0.15 +org.eclipse.jetty:jetty-alpn-java-client:11.0.15 org.eclipse.jetty:jetty-alpn-java-server:11.0.15 org.eclipse.jetty:jetty-alpn-server:11.0.15 org.eclipse.jetty:jetty-client:11.0.15 @@ -156,8 +157,10 @@ org.eclipse.jetty:jetty-security:11.0.15 org.eclipse.jetty:jetty-server:11.0.15 org.eclipse.jetty:jetty-servlet:11.0.15 org.eclipse.jetty:jetty-util:11.0.15 +org.eclipse.jetty.http2:http2-client:11.0.15 org.eclipse.jetty.http2:http2-common:11.0.15 org.eclipse.jetty.http2:http2-hpack:11.0.15 +org.eclipse.jetty.http2:http2-http-client-transport:11.0.15 org.eclipse.jetty.http2:http2-server:11.0.15 org.eclipse.jetty.toolchain:jetty-jakarta-servlet-api:5.0.2 org.eclipse.sisu:org.eclipse.sisu.inject:0.3.5 diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index b6440653a78..19130b52268 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -29,6 +29,11 @@ httpclient5 compile + + org.eclipse.jetty.http2 + http2-http-client-transport + compile + com.fasterxml.jackson.core jackson-core diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java new file mode 100644 index 00000000000..48b34c02672 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -0,0 +1,245 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClientBuilder.Compression; +import ai.vespa.feed.client.HttpResponse; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.AbstractRequestContent; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.SocketAddressResolver; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.Inet4Address; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + +import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; +import static ai.vespa.feed.client.FeedClientBuilder.Compression.gzip; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; + +/** + * Client implementation based on Jetty HTTP Client + * + * @author bjorncs + */ +class JettyCluster implements Cluster { + + // Socket timeout must be longer than the longest feasible response timeout + private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15); + + private final HttpClient client; + private final List endpoints; + private final Compression compression; + + JettyCluster(FeedClientBuilderImpl b) throws IOException { + this.client = createHttpClient(b); + this.endpoints = b.endpoints.stream().map(Endpoint::new).collect(Collectors.toList()); + this.compression = b.compression; + } + + @Override + public void dispatch(HttpRequest req, CompletableFuture vessel) { + Endpoint endpoint = findLeastBusyEndpoint(endpoints); + long reqTimeoutMillis = req.timeout() != null + ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); + Request jettyReq = client.newRequest(endpoint.uri.getHost(), portOf(endpoint.uri)) + .version(HttpVersion.HTTP_2) + .scheme(endpoint.uri.getScheme()) + .method(HttpMethod.fromString(req.method())) + .path(req.path()) + .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) + .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) + .timeout(reqTimeoutMillis, MILLISECONDS); + if (req.body() != null) { + FeedContent content = new FeedContent(compression, req.body()); + content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce))); + jettyReq.body(content); + } + jettyReq.send(new BufferingResponseListener() { + @Override + public void onComplete(Result result) { + if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); + else vessel.complete(new JettyResponse(result.getResponse(), getContent())); + } + }); + } + + @Override + public void close() { + try { + client.stop(); + } catch (Exception e) { throw new RuntimeException(e); } + } + + private static HttpClient createHttpClient(FeedClientBuilderImpl b) throws IOException { + SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); + clientSslCtxFactory.setSslContext(b.constructSslContext()); + if (b.hostnameVerifier != null) { + clientSslCtxFactory.setHostnameVerifier(b.hostnameVerifier); + // Disable built-in hostname verification in the JDK's TLS implementation + clientSslCtxFactory.setEndpointIdentificationAlgorithm(null); + } + ClientConnector connector = new ClientConnector(); + int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 16), 4); + connector.setExecutor(new QueuedThreadPool(threads)); + connector.setSslContextFactory(clientSslCtxFactory); + HTTP2Client h2Client = new HTTP2Client(connector); + h2Client.setMaxConcurrentPushedStreams(b.maxStreamsPerConnection); + HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP2(h2Client)); + httpClient.setMaxConnectionsPerDestination(b.connectionsPerEndpoint); + httpClient.setFollowRedirects(false); + httpClient.setUserAgentField( + new HttpField(HttpHeader.USER_AGENT, String.format("vespa-feed-client/%s (Jetty)", Vespa.VERSION))); + httpClient.setMaxRequestsQueuedPerDestination(Integer.MAX_VALUE); + httpClient.setConnectTimeout(Duration.ofSeconds(10).toMillis()); + // Stop client from trying different IP address when TLS handshake fails + httpClient.setSocketAddressResolver(new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10))); + + httpClient.setIdleTimeout(IDLE_TIMEOUT.toMillis()); + try { + httpClient.start(); + } catch (Exception e) { + throw new IOException(e); + } + return httpClient; + } + + private static Endpoint findLeastBusyEndpoint(List endpoints) { + Endpoint leastBusy = endpoints.get(0); + int minInflight = leastBusy.inflight.get(); + for (int i = 1; i < endpoints.size(); i++) { + Endpoint endpoint = endpoints.get(i); + int inflight = endpoint.inflight.get(); + if (inflight < minInflight) { + leastBusy = endpoint; + minInflight = inflight; + } + } + return leastBusy; + } + + private static int portOf(URI u) { + return u.getPort() == -1 ? u.getScheme().equals("http") ? 80 : 443 : u.getPort(); + } + + private static class JettyResponse implements HttpResponse { + final Response response; + final byte[] content; + + JettyResponse(Response response, byte[] content) { this.response = response; this.content = content; } + + @Override public int code() { return response.getStatus(); } + @Override public byte[] body() { return content; } + @Override public String contentType() { return response.getHeaders().get(HttpHeader.CONTENT_TYPE); } + } + + private static class Endpoint { + final AtomicInteger inflight = new AtomicInteger(); + final URI uri; + Endpoint(URI uri) { this.uri = uri; } + } + + private static class FeedContent extends AbstractRequestContent { + final Compression compression; + final byte[] body; + + FeedContent(Compression compression, byte[] body) { + super(APPLICATION_JSON.asString()); + this.compression = compression; + this.body = body; + } + + @Override public boolean isReproducible() { return true; } + @Override public long getLength() { return shouldCompress() ? -1 : body.length; } + Optional contentEncoding() { + return shouldCompress() ? Optional.of(new HttpField(HttpHeader.CONTENT_ENCODING, "gzip")) : Optional.empty(); + } + + @Override + public Subscription newSubscription(Consumer consumer, boolean emitInitialContent) { + return new SubscriptionImpl(consumer, emitInitialContent); + } + + boolean shouldCompress() { return compression == gzip || compression == auto && body.length > 512; } + + class SubscriptionImpl extends AbstractSubscription { + SubscriptionImpl(Consumer consumer, boolean emitInitialContent) { super(consumer, emitInitialContent); } + + @Override + protected boolean produceContent(Producer producer) { + byte[] bytes; + if (shouldCompress()) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10); + try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) { + zip.write(body); + } catch (IOException e) { throw new UncheckedIOException(e); } + bytes = buffer.toByteArray(); + } else { + bytes = body; + } + return producer.produce(ByteBuffer.wrap(bytes), true, Callback.NOOP); + } + } + } + + private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver { + + final HttpClient client; + final Duration timeout; + SocketAddressResolver.Async instance; + + Ipv4PreferringResolver(HttpClient client, Duration timeout) { this.client = client; this.timeout = timeout; } + + @Override + protected void doStart() { + this.instance = new SocketAddressResolver.Async(client.getExecutor(), client.getScheduler(), timeout.toMillis()); + } + + @Override + public void resolve(String host, int port, Promise> promise) { + instance.resolve(host, port, new Promise.Wrapper>(promise) { + @Override + public void succeeded(List result) { + if (result.size() <= 1) { + getPromise().succeeded(result); + return; + } + List ipv4Addresses = result.stream() + .filter(addr -> addr.getAddress() instanceof Inet4Address).collect(Collectors.toList()); + if (ipv4Addresses.isEmpty()) { + getPromise().succeeded(result); + return; + } + getPromise().succeeded(ipv4Addresses); + } + }); + } + } +} -- cgit v1.2.3