diff options
author | Bjørn Christian Seime <bjorn.christian@seime.no> | 2023-07-10 15:09:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-10 15:09:31 +0200 |
commit | 169e10dc76c9424abd79f1cea1a1fa9066701864 (patch) | |
tree | 6ce16f90f87d253a74080ff2660a7c1d4c7f9420 | |
parent | 59be3ee9c6ea727848c4fa8ed67f551d83a07608 (diff) | |
parent | 804fe2aa1f3df9890805a823208a48b36e055af0 (diff) |
Merge pull request #27728 from vespa-engine/bjorncs/vespa-feed-client
Bjorncs/vespa feed client
3 files changed, 55 insertions, 12 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index d9eec3dce77..197b7721eca 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -201,7 +201,6 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { @Override public FeedClientBuilderImpl setProxy(URI uri) { - log.warning("Proxy configuration ignored - not supported yet"); this.proxy = uri; return this; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index c2181821de6..f228717eba5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -321,7 +321,7 @@ class HttpFeedClient implements FeedClient { .map(Boolean::parseBoolean) .orElse(Optional.ofNullable(System.getProperty(name)) .map(Boolean::parseBoolean) - .orElse(false)); + .orElse(true)); } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 79fe8dd630f..30dc1ab0d07 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -5,7 +5,10 @@ package ai.vespa.feed.client.impl; import ai.vespa.feed.client.FeedClientBuilder.Compression; import ai.vespa.feed.client.HttpResponse; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpProxy; import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.api.Authentication; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -33,9 +36,13 @@ import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.URI; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; @@ -67,14 +74,17 @@ class JettyCluster implements Cluster { @Override public void dispatch(HttpRequest req, CompletableFuture<HttpResponse> vessel) { client.getExecutor().execute(() -> { + Endpoint endpoint = findLeastBusyEndpoint(endpoints); try { - Endpoint endpoint = findLeastBusyEndpoint(endpoints); + endpoint.inflight.incrementAndGet(); long reqTimeoutMillis = req.timeout() != null ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) - .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) + .headers(hs -> req.headers().forEach((k, v) -> { + if (!isProxyHeader(k)) hs.add(k, v.get()); + })) .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) .timeout(reqTimeoutMillis, MILLISECONDS); if (req.body() != null) { @@ -95,11 +105,13 @@ class JettyCluster implements Cluster { jettyReq.send(new BufferingResponseListener() { @Override public void onComplete(Result result) { + endpoint.inflight.decrementAndGet(); if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); else vessel.complete(new JettyResponse(result.getResponse(), getContent())); } }); } catch (Exception e) { + endpoint.inflight.decrementAndGet(); vessel.completeExceptionally(e); } }); @@ -124,6 +136,8 @@ class JettyCluster implements Cluster { int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 32), 8); connector.setExecutor(new QueuedThreadPool(threads)); connector.setSslContextFactory(clientSslCtxFactory); + connector.setIdleTimeout(IDLE_TIMEOUT); + connector.setConnectTimeout(Duration.ofSeconds(10)); HTTP2Client h2Client = new HTTP2Client(connector); h2Client.setMaxConcurrentPushedStreams(b.maxStreamsPerConnection); // Set the HTTP/2 flow control windows very large to cause TCP congestion instead of HTTP/2 flow control congestion. @@ -141,20 +155,46 @@ class JettyCluster implements Cluster { httpClient.setFollowRedirects(false); httpClient.setUserAgentField( new HttpField(HttpHeader.USER_AGENT, String.format("vespa-feed-client/%s (Jetty)", Vespa.VERSION))); - httpClient.setConnectTimeout(Duration.ofSeconds(10).toMillis()); // Stop client from trying different IP address when TLS handshake fails httpClient.setSocketAddressResolver(new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10))); httpClient.setCookieStore(new HttpCookieStore.Empty()); - httpClient.setIdleTimeout(IDLE_TIMEOUT.toMillis()); - try { - httpClient.start(); - } catch (Exception e) { - throw new IOException(e); - } + if (b.proxy != null) addProxyConfiguration(b, httpClient); + try { httpClient.start(); } catch (Exception e) { throw new IOException(e); } return httpClient; } + private static void addProxyConfiguration(FeedClientBuilderImpl b, HttpClient httpClient) throws IOException { + Origin.Address address = new Origin.Address(b.proxy.getHost(), b.proxy.getPort()); + if (b.proxy.getScheme().equals("https")) { + SslContextFactory.Client proxySslCtxFactory = new SslContextFactory.Client(); + if (b.hostnameVerifier != null) proxySslCtxFactory.setHostnameVerifier(b.hostnameVerifier); + // Disable built-in hostname verification in the JDK's TLS implementation + proxySslCtxFactory.setEndpointIdentificationAlgorithm(null); + try { proxySslCtxFactory.start(); } catch (Exception e) { throw new IOException(e); } + httpClient.getProxyConfiguration().addProxy( + new HttpProxy(address, proxySslCtxFactory, new Origin.Protocol(Collections.singletonList("h2"), false))); + } else { + httpClient.getProxyConfiguration().addProxy( + new HttpProxy(address, false, new Origin.Protocol(Collections.singletonList("h2c"), false))); + } + Map<String, Supplier<String>> proxyHeaders = new TreeMap<>(); + b.requestHeaders.forEach((k, v) -> { if (isProxyHeader(k)) proxyHeaders.put(k, v); }); + if (!proxyHeaders.isEmpty()) { + for (URI endpoint : b.endpoints) { + httpClient.getAuthenticationStore().addAuthenticationResult(new Authentication.Result() { + @Override public URI getURI() { return URI.create(endpointUri(endpoint)); } + @Override public void apply(Request r) { + r.headers(hs -> proxyHeaders.forEach((k, v) -> hs.add(k, v.get()))); + } + }); + + } + } + } + + private static boolean isProxyHeader(String h) { return h.equalsIgnoreCase(HttpHeader.PROXY_AUTHORIZATION.asString()); } + private static Endpoint findLeastBusyEndpoint(List<Endpoint> endpoints) { Endpoint leastBusy = endpoints.get(0); int minInflight = leastBusy.inflight.get(); @@ -173,6 +213,10 @@ class JettyCluster implements Cluster { return u.getPort() == -1 ? u.getScheme().equals("http") ? 80 : 443 : u.getPort(); } + private static String endpointUri(URI uri) { + return String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); + } + private static class JettyResponse implements HttpResponse { final Response response; final byte[] content; @@ -187,7 +231,7 @@ class JettyCluster implements Cluster { private static class Endpoint { final AtomicInteger inflight = new AtomicInteger(); final String uri; - Endpoint(URI uri) { this.uri = String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); } + Endpoint(URI uri) { this.uri = endpointUri(uri); } } private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver { |