summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-13 11:44:30 +0200
committerBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-13 11:49:55 +0200
commitb2dcd9de3f8ed19bd115ab46156c4752c6875a90 (patch)
treed7cc2652576e09048ec819c976fe75a6aab94442 /vespa-feed-client
parent96d8009fb2921d4fc3152a89b97a888bd7e6f166 (diff)
Remove Apache based implementation
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/pom.xml20
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java243
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java12
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java16
-rw-r--r--vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java20
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java77
6 files changed, 1 insertions, 387 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 19130b52268..feef54ae589 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -25,11 +25,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.httpcomponents.client5</groupId>
- <artifactId>httpclient5</artifactId>
- <scope>compile</scope>
- </dependency>
- <dependency>
<groupId>org.eclipse.jetty.http2</groupId>
<artifactId>http2-http-client-transport</artifactId>
<scope>compile</scope>
@@ -80,21 +75,6 @@
<showDeprecation>true</showDeprecation>
</configuration>
</execution>
- <execution>
- <id>compile-java-9</id>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- <configuration>
- <release>9</release>
- <compileSourceRoots>
- <compileSourceRoot>${project.basedir}/src/main/java9</compileSourceRoot>
- </compileSourceRoots>
- <outputDirectory>${project.build.outputDirectory}/META-INF/versions/9</outputDirectory>
- <showDeprecation>true</showDeprecation>
- </configuration>
- </execution>
</executions>
</plugin>
<plugin>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
deleted file mode 100644
index 96c65a6b165..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
+++ /dev/null
@@ -1,243 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client.impl;
-
-import ai.vespa.feed.client.FeedClientBuilder.Compression;
-import ai.vespa.feed.client.HttpResponse;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.config.ConnectionConfig;
-import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
-import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.Header;
-import org.apache.hc.core5.http.HttpHeaders;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.net.URIAuthority;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.util.Timeout;
-
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.GZIPOutputStream;
-
-import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto;
-import static ai.vespa.feed.client.FeedClientBuilder.Compression.gzip;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
-
-/**
- * @author jonmv
- */
-class ApacheCluster implements Cluster {
-
- private final List<Endpoint> endpoints = new ArrayList<>();
- private final List<BasicHeader> defaultHeaders = Arrays.asList(new BasicHeader(HttpHeaders.USER_AGENT, String.format("vespa-feed-client/%s (Apache)", Vespa.VERSION)),
- new BasicHeader("Vespa-Client-Version", Vespa.VERSION));
- private final Header gzipEncodingHeader = new BasicHeader(HttpHeaders.CONTENT_ENCODING, "gzip");
- private final Compression compression;
- private int someNumber = 0;
-
- private final ExecutorService dispatchExecutor = Executors.newFixedThreadPool(8, t -> new Thread(t, "request-dispatch-thread"));
- private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread"));
-
- ApacheCluster(FeedClientBuilderImpl builder) throws IOException {
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- for (URI endpoint : builder.endpoints)
- endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
- this.compression = builder.compression;
- }
-
- @Override
- public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) {
- Endpoint leastBusy = endpoints.get(0);
- int min = Integer.MAX_VALUE;
- int start = ++someNumber % endpoints.size();
- for (int i = 0; i < endpoints.size(); i++) {
- Endpoint endpoint = endpoints.get((i + start) % endpoints.size());
- int inflight = endpoint.inflight.get();
- if (inflight < min) {
- leastBusy = endpoint;
- min = inflight;
- }
- }
- Endpoint endpoint = leastBusy;
- endpoint.inflight.incrementAndGet();
-
- dispatchExecutor.execute(() -> {
- try {
- SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
- request.setScheme(endpoint.url.getScheme());
- request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url)));
- request.setConfig(RequestConfig.custom().setConnectionRequestTimeout(Timeout.DISABLED).build());
- defaultHeaders.forEach(request::setHeader);
- wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
- if (wrapped.body() != null) {
- byte[] body = wrapped.body();
- if (compression == gzip || compression == auto && body.length > 512) {
- request.setHeader(gzipEncodingHeader);
- body = gzipped(body);
- }
- request.setBody(body, ContentType.APPLICATION_JSON);
- }
-
- Future<?> future = endpoint.client.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
- // Manually schedule response timeout as the Apache HTTP/2 multiplexing client does not support response timeouts
- long timeoutMillis = wrapped.timeout() == null ? 190_000 : wrapped.timeout().toMillis();
- Future<?> cancellation = timeoutExecutor.schedule(
- () -> {
- vessel.completeExceptionally(
- new TimeoutException(String.format("Request timed out after %dms", timeoutMillis)));
- future.cancel(true);
- },
- timeoutMillis * 11 / 10 + 1_000, TimeUnit.MILLISECONDS);
- vessel.whenComplete((__, ___) -> cancellation.cancel(true));
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
- });
- }
-
- private byte[] gzipped(byte[] content) throws IOException{
- ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10);
- try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) {
- zip.write(content);
- }
- return buffer.toByteArray();
- }
-
- @Override
- public void close() {
- Throwable thrown = null;
- dispatchExecutor.shutdownNow().forEach(Runnable::run);
- for (Endpoint endpoint : endpoints) {
- try {
- endpoint.client.close();
- }
- catch (Throwable t) {
- if (thrown == null) thrown = t;
- else thrown.addSuppressed(t);
- }
- }
- timeoutExecutor.shutdownNow().forEach(Runnable::run);
- if (thrown != null) throw new RuntimeException(thrown);
- }
-
-
- private static class Endpoint {
-
- private final CloseableHttpAsyncClient client;
- private final AtomicInteger inflight = new AtomicInteger(0);
- private final URI url;
-
- private Endpoint(CloseableHttpAsyncClient client, URI url) {
- this.client = client;
- this.url = url;
-
- this.client.start();
- }
-
- }
-
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilderImpl builder) throws IOException {
- SSLContext sslContext = builder.constructSslContext();
- String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
- if (allowedCiphers.length == 0)
- throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
-
- ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create()
- .setCiphers(allowedCiphers)
- .setSslContext(sslContext);
- if (builder.hostnameVerifier != null)
- tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier);
-
- // Socket timeout must be longer than the longest feasible response timeout
- Timeout socketTimeout = Timeout.ofMinutes(15);
-
- ConnectionConfig connCfg = ConnectionConfig.custom()
- .setSocketTimeout(socketTimeout)
- .setConnectTimeout(Timeout.ofSeconds(10))
- .build();
-
- return HttpAsyncClients.customHttp2()
- .setH2Config(
- H2Config.custom()
- .setMaxConcurrentStreams(builder.maxStreamsPerConnection)
- .setCompressionEnabled(true)
- .setPushEnabled(false)
- .setInitialWindowSize(Integer.MAX_VALUE)
- .build())
- .setIOReactorConfig(
- IOReactorConfig.custom()
- .setIoThreadCount(Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 8), 2))
- .setTcpNoDelay(true)
- .setSoTimeout(socketTimeout)
- .build())
- .setTlsStrategy(tlsStrategyBuilder.build())
- .setDefaultConnectionConfig(connCfg)
- .disableAutomaticRetries()
- .disableRedirectHandling()
- .disableCookieManagement()
- .build();
- }
-
- private static int portOf(URI url) {
- return url.getPort() == -1 ? url.getScheme().equals("http") ? 80 : 443
- : url.getPort();
- }
-
- private static class ApacheHttpResponse implements HttpResponse {
-
- private final SimpleHttpResponse wrapped;
-
- private ApacheHttpResponse(SimpleHttpResponse wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public int code() {
- return wrapped.getCode();
- }
-
- @Override
- public byte[] body() {
- return wrapped.getBodyBytes();
- }
-
- @Override
- public String contentType() {
- return wrapped.getContentType().getMimeType();
- }
-
- @Override
- public String toString() {
- return "HTTP response with code " + code() +
- (body() != null ? " and body '" + wrapped.getBodyText() + "'" : "");
- }
-
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index f228717eba5..40c5fda8ce3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -55,8 +55,7 @@ class HttpFeedClient implements FeedClient {
private final boolean speedTest;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
- this(builder, builder.dryrun ?
- new DryrunCluster() : experimentalClientEnabled() ? new JettyCluster(builder) : new ApacheCluster(builder));
+ this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder));
}
HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) {
@@ -315,13 +314,4 @@ class HttpFeedClient implements FeedClient {
return query.toString();
}
- private static boolean experimentalClientEnabled() {
- String name = "VESPA_FEED_EXPERIMENTAL_CLIENT";
- return Optional.ofNullable(System.getenv(name))
- .map(Boolean::parseBoolean)
- .orElse(Optional.ofNullable(System.getProperty(name))
- .map(Boolean::parseBoolean)
- .orElse(true));
- }
-
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java
deleted file mode 100644
index 5183ce61761..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client.impl;
-
-import org.apache.hc.core5.reactor.ssl.TlsDetails;
-
-import javax.net.ssl.SSLEngine;
-
-/**
- * @author bjorncs
- */
-public class TlsDetailsFactory {
- private TlsDetailsFactory() {}
-
- public static TlsDetails create(SSLEngine e) { return new TlsDetails(e.getSession(), "h2"); /*h2 == HTTP2*/ }
-}
-
diff --git a/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java b/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java
deleted file mode 100644
index f9903d9943d..00000000000
--- a/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java
+++ /dev/null
@@ -1,20 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client.impl;
-
-import org.apache.hc.core5.reactor.ssl.TlsDetails;
-
-import javax.net.ssl.SSLEngine;
-
-/**
- * {@link SSLEngine#getApplicationProtocol()} is not available on all JDK8 versions
- * (https://bugs.openjdk.org/browse/JDK-8051498)
- *
- * @author bjorncs
- */
-public class TlsDetailsFactory {
- private TlsDetailsFactory() {}
-
- public static TlsDetails create(SSLEngine e) {
- return new TlsDetails(e.getSession(), e.getApplicationProtocol());
- }
-}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java
deleted file mode 100644
index cf9a36f2aa8..00000000000
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package ai.vespa.feed.client.impl;
-
-import ai.vespa.feed.client.FeedClientBuilder.Compression;
-import ai.vespa.feed.client.HttpResponse;
-import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.zip.GZIPOutputStream;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.any;
-import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
-import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
-import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
-import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
-import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class ApacheClusterTest {
-
- @RegisterExtension
- final WireMockExtension server = new WireMockExtension();
-
- @Test
- void testClient() throws Exception {
- for (Compression compression : Compression.values()) {
- try (ApacheCluster cluster = new ApacheCluster(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:" + server.port())))
- .setCompression(compression))) {
- server.stubFor(any(anyUrl()))
- .setResponse(okJson("{}").build());
-
- CompletableFuture<HttpResponse> vessel = new CompletableFuture<>();
- cluster.dispatch(new HttpRequest("POST",
- "/path",
- Map.of("name1", () -> "value1",
- "name2", () -> "value2"),
- "content".getBytes(UTF_8),
- Duration.ofSeconds(10)),
- vessel);
-
- AutoCloseable verifyResponse = () -> {
- HttpResponse response = vessel.get(15, TimeUnit.SECONDS);
- assertEquals("{}", new String(response.body(), UTF_8));
- assertEquals(200, response.code());
- };
- AutoCloseable verifyServer = () -> {
- server.verify(1, anyRequestedFor(anyUrl()));
- RequestPatternBuilder expected = postRequestedFor(urlEqualTo("/path")).withHeader("name1", equalTo("value1"))
- .withHeader("name2", equalTo("value2"))
- .withHeader("Content-Type", equalTo("application/json; charset=UTF-8"))
- .withRequestBody(equalTo("content"));
- expected = switch (compression) {
- case auto, none -> expected.withoutHeader("Content-Encoding");
- case gzip -> expected.withHeader("Content-Encoding", equalTo("gzip"));
- };
- server.verify(1, expected);
- server.resetRequests();
- };
- try (verifyServer; verifyResponse) { }
- }
- }
- }
-
-}