diff options
author | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-05 12:22:04 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-05 14:32:09 +0200 |
commit | 321794ad856f2de8b71a95258db85b15f8653419 (patch) | |
tree | a56ce88ddc9f9d7192c602f28b57e338750c2e32 /vespa-feed-client | |
parent | c9a6fac660cd11b5b3c6dda6eaa4ef8046600658 (diff) |
Introduce experimental HTTP/2 client implementation based on Jetty
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/pom.xml | 5 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java | 245 |
2 files changed, 250 insertions, 0 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index b6440653a78..19130b52268 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -30,6 +30,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.eclipse.jetty.http2</groupId> + <artifactId>http2-http-client-transport</artifactId> + <scope>compile</scope> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <scope>compile</scope> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java new file mode 100644 index 00000000000..48b34c02672 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -0,0 +1,245 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClientBuilder.Compression; +import ai.vespa.feed.client.HttpResponse; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; +import org.eclipse.jetty.client.util.AbstractRequestContent; +import org.eclipse.jetty.client.util.BufferingResponseListener; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.SocketAddressResolver; +import org.eclipse.jetty.util.component.AbstractLifeCycle; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.Inet4Address; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.zip.GZIPOutputStream; + +import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; +import static ai.vespa.feed.client.FeedClientBuilder.Compression.gzip; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; + +/** + * Client implementation based on Jetty HTTP Client + * + * @author bjorncs + */ +class JettyCluster implements Cluster { + + // Socket timeout must be longer than the longest feasible response timeout + private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15); + + private final HttpClient client; + private final List<Endpoint> endpoints; + private final Compression compression; + + JettyCluster(FeedClientBuilderImpl b) throws IOException { + this.client = createHttpClient(b); + this.endpoints = b.endpoints.stream().map(Endpoint::new).collect(Collectors.toList()); + this.compression = b.compression; + } + + @Override + public void dispatch(HttpRequest req, CompletableFuture<HttpResponse> vessel) { + Endpoint endpoint = findLeastBusyEndpoint(endpoints); + long reqTimeoutMillis = req.timeout() != null + ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); + Request jettyReq = client.newRequest(endpoint.uri.getHost(), portOf(endpoint.uri)) + .version(HttpVersion.HTTP_2) + .scheme(endpoint.uri.getScheme()) + .method(HttpMethod.fromString(req.method())) + .path(req.path()) + .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) + .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) + .timeout(reqTimeoutMillis, MILLISECONDS); + if (req.body() != null) { + FeedContent content = new FeedContent(compression, req.body()); + content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce))); + jettyReq.body(content); + } + jettyReq.send(new BufferingResponseListener() { + @Override + public void onComplete(Result result) { + if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); + else vessel.complete(new JettyResponse(result.getResponse(), getContent())); + } + }); + } + + @Override + public void close() { + try { + client.stop(); + } catch (Exception e) { throw new RuntimeException(e); } + } + + private static HttpClient createHttpClient(FeedClientBuilderImpl b) throws IOException { + SslContextFactory.Client clientSslCtxFactory = new SslContextFactory.Client(); + clientSslCtxFactory.setSslContext(b.constructSslContext()); + if (b.hostnameVerifier != null) { + clientSslCtxFactory.setHostnameVerifier(b.hostnameVerifier); + // Disable built-in hostname verification in the JDK's TLS implementation + clientSslCtxFactory.setEndpointIdentificationAlgorithm(null); + } + ClientConnector connector = new ClientConnector(); + int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 16), 4); + connector.setExecutor(new QueuedThreadPool(threads)); + connector.setSslContextFactory(clientSslCtxFactory); + HTTP2Client h2Client = new HTTP2Client(connector); + h2Client.setMaxConcurrentPushedStreams(b.maxStreamsPerConnection); + HttpClient httpClient = new HttpClient(new HttpClientTransportOverHTTP2(h2Client)); + httpClient.setMaxConnectionsPerDestination(b.connectionsPerEndpoint); + httpClient.setFollowRedirects(false); + httpClient.setUserAgentField( + new HttpField(HttpHeader.USER_AGENT, String.format("vespa-feed-client/%s (Jetty)", Vespa.VERSION))); + httpClient.setMaxRequestsQueuedPerDestination(Integer.MAX_VALUE); + httpClient.setConnectTimeout(Duration.ofSeconds(10).toMillis()); + // Stop client from trying different IP address when TLS handshake fails + httpClient.setSocketAddressResolver(new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10))); + + httpClient.setIdleTimeout(IDLE_TIMEOUT.toMillis()); + try { + httpClient.start(); + } catch (Exception e) { + throw new IOException(e); + } + return httpClient; + } + + private static Endpoint findLeastBusyEndpoint(List<Endpoint> endpoints) { + Endpoint leastBusy = endpoints.get(0); + int minInflight = leastBusy.inflight.get(); + for (int i = 1; i < endpoints.size(); i++) { + Endpoint endpoint = endpoints.get(i); + int inflight = endpoint.inflight.get(); + if (inflight < minInflight) { + leastBusy = endpoint; + minInflight = inflight; + } + } + return leastBusy; + } + + private static int portOf(URI u) { + return u.getPort() == -1 ? u.getScheme().equals("http") ? 80 : 443 : u.getPort(); + } + + private static class JettyResponse implements HttpResponse { + final Response response; + final byte[] content; + + JettyResponse(Response response, byte[] content) { this.response = response; this.content = content; } + + @Override public int code() { return response.getStatus(); } + @Override public byte[] body() { return content; } + @Override public String contentType() { return response.getHeaders().get(HttpHeader.CONTENT_TYPE); } + } + + private static class Endpoint { + final AtomicInteger inflight = new AtomicInteger(); + final URI uri; + Endpoint(URI uri) { this.uri = uri; } + } + + private static class FeedContent extends AbstractRequestContent { + final Compression compression; + final byte[] body; + + FeedContent(Compression compression, byte[] body) { + super(APPLICATION_JSON.asString()); + this.compression = compression; + this.body = body; + } + + @Override public boolean isReproducible() { return true; } + @Override public long getLength() { return shouldCompress() ? -1 : body.length; } + Optional<HttpField> contentEncoding() { + return shouldCompress() ? Optional.of(new HttpField(HttpHeader.CONTENT_ENCODING, "gzip")) : Optional.empty(); + } + + @Override + public Subscription newSubscription(Consumer consumer, boolean emitInitialContent) { + return new SubscriptionImpl(consumer, emitInitialContent); + } + + boolean shouldCompress() { return compression == gzip || compression == auto && body.length > 512; } + + class SubscriptionImpl extends AbstractSubscription { + SubscriptionImpl(Consumer consumer, boolean emitInitialContent) { super(consumer, emitInitialContent); } + + @Override + protected boolean produceContent(Producer producer) { + byte[] bytes; + if (shouldCompress()) { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10); + try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) { + zip.write(body); + } catch (IOException e) { throw new UncheckedIOException(e); } + bytes = buffer.toByteArray(); + } else { + bytes = body; + } + return producer.produce(ByteBuffer.wrap(bytes), true, Callback.NOOP); + } + } + } + + private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver { + + final HttpClient client; + final Duration timeout; + SocketAddressResolver.Async instance; + + Ipv4PreferringResolver(HttpClient client, Duration timeout) { this.client = client; this.timeout = timeout; } + + @Override + protected void doStart() { + this.instance = new SocketAddressResolver.Async(client.getExecutor(), client.getScheduler(), timeout.toMillis()); + } + + @Override + public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise) { + instance.resolve(host, port, new Promise.Wrapper<List<InetSocketAddress>>(promise) { + @Override + public void succeeded(List<InetSocketAddress> result) { + if (result.size() <= 1) { + getPromise().succeeded(result); + return; + } + List<InetSocketAddress> ipv4Addresses = result.stream() + .filter(addr -> addr.getAddress() instanceof Inet4Address).collect(Collectors.toList()); + if (ipv4Addresses.isEmpty()) { + getPromise().succeeded(result); + return; + } + getPromise().succeeded(ipv4Addresses); + } + }); + } + } +} |