From 470707d0d8b2b3397c3612d9a480055383a8f834 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Mon, 10 Jul 2023 13:44:48 +0200 Subject: Support HTTP/2 tunnel --- .../ai/vespa/feed/client/impl/JettyCluster.java | 55 +++++++++++++++++++--- 1 file changed, 48 insertions(+), 7 deletions(-) (limited to 'vespa-feed-client/src/main') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 58838e5a597..a26d8d713d1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -5,7 +5,10 @@ package ai.vespa.feed.client.impl; import ai.vespa.feed.client.FeedClientBuilder.Compression; import ai.vespa.feed.client.HttpResponse; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpProxy; import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -33,9 +36,13 @@ import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.URI; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; @@ -74,7 +81,9 @@ class JettyCluster implements Cluster { Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) - .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) + .headers(hs -> req.headers().forEach((k, v) -> { + if (!k.equalsIgnoreCase(HttpHeader.PROXY_AUTHORIZATION.asString())) hs.add(k, v.get()); + })) .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) .timeout(reqTimeoutMillis, MILLISECONDS); if (req.body() != null) { @@ -147,14 +156,42 @@ class JettyCluster implements Cluster { httpClient.setSocketAddressResolver(new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10))); httpClient.setCookieStore(new HttpCookieStore.Empty()); - try { - httpClient.start(); - } catch (Exception e) { - throw new IOException(e); - } + if (b.proxy != null) addProxyConfiguration(b, httpClient); + try { httpClient.start(); } catch (Exception e) { throw new IOException(e); } return httpClient; } + private static void addProxyConfiguration(FeedClientBuilderImpl b, HttpClient httpClient) throws IOException { + Origin.Address address = new Origin.Address(b.proxy.getHost(), b.proxy.getPort()); + if (b.proxy.getScheme().equals("https")) { + SslContextFactory.Client proxySslCtxFactory = new SslContextFactory.Client(); + if (b.hostnameVerifier != null) proxySslCtxFactory.setHostnameVerifier(b.hostnameVerifier); + // Disable built-in hostname verification in the JDK's TLS implementation + proxySslCtxFactory.setEndpointIdentificationAlgorithm(null); + try { proxySslCtxFactory.start(); } catch (Exception e) { throw new IOException(e); } + httpClient.getProxyConfiguration().addProxy( + new HttpProxy(address, proxySslCtxFactory, new Origin.Protocol(Collections.singletonList("h2"), false))); + } else { + httpClient.getProxyConfiguration().addProxy( + new HttpProxy(address, false, new Origin.Protocol(Collections.singletonList("h2c"), false))); + } + Map> proxyHeaders = new TreeMap<>(); + b.requestHeaders.forEach((k, v) -> { if (isProxyHeader(k)) proxyHeaders.put(k, v); }); + if (!proxyHeaders.isEmpty()) { + for (URI endpoint : b.endpoints) { + httpClient.getAuthenticationStore().addAuthenticationResult(new Authentication.Result() { + @Override public URI getURI() { return URI.create(endpointUri(endpoint)); } + @Override public void apply(Request r) { + r.headers(hs -> proxyHeaders.forEach((k, v) -> hs.add(k, v.get()))); + } + }); + + } + } + } + + private static boolean isProxyHeader(String h) { return h.equalsIgnoreCase(HttpHeader.PROXY_AUTHORIZATION.asString()); } + private static Endpoint findLeastBusyEndpoint(List endpoints) { Endpoint leastBusy = endpoints.get(0); int minInflight = leastBusy.inflight.get(); @@ -173,6 +210,10 @@ class JettyCluster implements Cluster { return u.getPort() == -1 ? u.getScheme().equals("http") ? 80 : 443 : u.getPort(); } + private static String endpointUri(URI uri) { + return String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); + } + private static class JettyResponse implements HttpResponse { final Response response; final byte[] content; @@ -187,7 +228,7 @@ class JettyCluster implements Cluster { private static class Endpoint { final AtomicInteger inflight = new AtomicInteger(); final String uri; - Endpoint(URI uri) { this.uri = String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); } + Endpoint(URI uri) { this.uri = endpointUri(uri); } } private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver { -- cgit v1.2.3