aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorn.christian@seime.no>2023-07-10 15:09:31 +0200
committerGitHub <noreply@github.com>2023-07-10 15:09:31 +0200
commit169e10dc76c9424abd79f1cea1a1fa9066701864 (patch)
tree6ce16f90f87d253a74080ff2660a7c1d4c7f9420
parent59be3ee9c6ea727848c4fa8ed67f551d83a07608 (diff)
parent804fe2aa1f3df9890805a823208a48b36e055af0 (diff)
Merge pull request #27728 from vespa-engine/bjorncs/vespa-feed-client
Bjorncs/vespa feed client
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java1
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java64
3 files changed, 55 insertions, 12 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index d9eec3dce77..197b7721eca 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -201,7 +201,6 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
@Override
public FeedClientBuilderImpl setProxy(URI uri) {
- log.warning("Proxy configuration ignored - not supported yet");
this.proxy = uri;
return this;
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index c2181821de6..f228717eba5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -321,7 +321,7 @@ class HttpFeedClient implements FeedClient {
.map(Boolean::parseBoolean)
.orElse(Optional.ofNullable(System.getProperty(name))
.map(Boolean::parseBoolean)
- .orElse(false));
+ .orElse(true));
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index 79fe8dd630f..30dc1ab0d07 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -5,7 +5,10 @@ package ai.vespa.feed.client.impl;
import ai.vespa.feed.client.FeedClientBuilder.Compression;
import ai.vespa.feed.client.HttpResponse;
import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.HttpProxy;
import org.eclipse.jetty.client.MultiplexConnectionPool;
+import org.eclipse.jetty.client.Origin;
+import org.eclipse.jetty.client.api.Authentication;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@@ -33,9 +36,13 @@ import java.net.Inet4Address;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
@@ -67,14 +74,17 @@ class JettyCluster implements Cluster {
@Override
public void dispatch(HttpRequest req, CompletableFuture<HttpResponse> vessel) {
client.getExecutor().execute(() -> {
+ Endpoint endpoint = findLeastBusyEndpoint(endpoints);
try {
- Endpoint endpoint = findLeastBusyEndpoint(endpoints);
+ endpoint.inflight.incrementAndGet();
long reqTimeoutMillis = req.timeout() != null
? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
.version(HttpVersion.HTTP_2)
.method(HttpMethod.fromString(req.method()))
- .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get())))
+ .headers(hs -> req.headers().forEach((k, v) -> {
+ if (!isProxyHeader(k)) hs.add(k, v.get());
+ }))
.idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS)
.timeout(reqTimeoutMillis, MILLISECONDS);
if (req.body() != null) {
@@ -95,11 +105,13 @@ class JettyCluster implements Cluster {
jettyReq.send(new BufferingResponseListener() {
@Override
public void onComplete(Result result) {
+ endpoint.inflight.decrementAndGet();
if (result.isFailed()) vessel.completeExceptionally(result.getFailure());
else vessel.complete(new JettyResponse(result.getResponse(), getContent()));
}
});
} catch (Exception e) {
+ endpoint.inflight.decrementAndGet();
vessel.completeExceptionally(e);
}
});
@@ -124,6 +136,8 @@ class JettyCluster implements Cluster {
int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 32), 8);
connector.setExecutor(new QueuedThreadPool(threads));
connector.setSslContextFactory(clientSslCtxFactory);
+ connector.setIdleTimeout(IDLE_TIMEOUT);
+ connector.setConnectTimeout(Duration.ofSeconds(10));
HTTP2Client h2Client = new HTTP2Client(connector);
h2Client.setMaxConcurrentPushedStreams(b.maxStreamsPerConnection);
// Set the HTTP/2 flow control windows very large to cause TCP congestion instead of HTTP/2 flow control congestion.
@@ -141,20 +155,46 @@ class JettyCluster implements Cluster {
httpClient.setFollowRedirects(false);
httpClient.setUserAgentField(
new HttpField(HttpHeader.USER_AGENT, String.format("vespa-feed-client/%s (Jetty)", Vespa.VERSION)));
- httpClient.setConnectTimeout(Duration.ofSeconds(10).toMillis());
// Stop client from trying different IP address when TLS handshake fails
httpClient.setSocketAddressResolver(new Ipv4PreferringResolver(httpClient, Duration.ofSeconds(10)));
httpClient.setCookieStore(new HttpCookieStore.Empty());
- httpClient.setIdleTimeout(IDLE_TIMEOUT.toMillis());
- try {
- httpClient.start();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ if (b.proxy != null) addProxyConfiguration(b, httpClient);
+ try { httpClient.start(); } catch (Exception e) { throw new IOException(e); }
return httpClient;
}
+ private static void addProxyConfiguration(FeedClientBuilderImpl b, HttpClient httpClient) throws IOException {
+ Origin.Address address = new Origin.Address(b.proxy.getHost(), b.proxy.getPort());
+ if (b.proxy.getScheme().equals("https")) {
+ SslContextFactory.Client proxySslCtxFactory = new SslContextFactory.Client();
+ if (b.hostnameVerifier != null) proxySslCtxFactory.setHostnameVerifier(b.hostnameVerifier);
+ // Disable built-in hostname verification in the JDK's TLS implementation
+ proxySslCtxFactory.setEndpointIdentificationAlgorithm(null);
+ try { proxySslCtxFactory.start(); } catch (Exception e) { throw new IOException(e); }
+ httpClient.getProxyConfiguration().addProxy(
+ new HttpProxy(address, proxySslCtxFactory, new Origin.Protocol(Collections.singletonList("h2"), false)));
+ } else {
+ httpClient.getProxyConfiguration().addProxy(
+ new HttpProxy(address, false, new Origin.Protocol(Collections.singletonList("h2c"), false)));
+ }
+ Map<String, Supplier<String>> proxyHeaders = new TreeMap<>();
+ b.requestHeaders.forEach((k, v) -> { if (isProxyHeader(k)) proxyHeaders.put(k, v); });
+ if (!proxyHeaders.isEmpty()) {
+ for (URI endpoint : b.endpoints) {
+ httpClient.getAuthenticationStore().addAuthenticationResult(new Authentication.Result() {
+ @Override public URI getURI() { return URI.create(endpointUri(endpoint)); }
+ @Override public void apply(Request r) {
+ r.headers(hs -> proxyHeaders.forEach((k, v) -> hs.add(k, v.get())));
+ }
+ });
+
+ }
+ }
+ }
+
+ private static boolean isProxyHeader(String h) { return h.equalsIgnoreCase(HttpHeader.PROXY_AUTHORIZATION.asString()); }
+
private static Endpoint findLeastBusyEndpoint(List<Endpoint> endpoints) {
Endpoint leastBusy = endpoints.get(0);
int minInflight = leastBusy.inflight.get();
@@ -173,6 +213,10 @@ class JettyCluster implements Cluster {
return u.getPort() == -1 ? u.getScheme().equals("http") ? 80 : 443 : u.getPort();
}
+ private static String endpointUri(URI uri) {
+ return String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri));
+ }
+
private static class JettyResponse implements HttpResponse {
final Response response;
final byte[] content;
@@ -187,7 +231,7 @@ class JettyCluster implements Cluster {
private static class Endpoint {
final AtomicInteger inflight = new AtomicInteger();
final String uri;
- Endpoint(URI uri) { this.uri = String.format("%s://%s:%s", uri.getScheme(), uri.getHost(), portOf(uri)); }
+ Endpoint(URI uri) { this.uri = endpointUri(uri); }
}
private static class Ipv4PreferringResolver extends AbstractLifeCycle implements SocketAddressResolver {