diff options
author | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-13 11:44:30 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-13 11:49:55 +0200 |
commit | b2dcd9de3f8ed19bd115ab46156c4752c6875a90 (patch) | |
tree | d7cc2652576e09048ec819c976fe75a6aab94442 /vespa-feed-client | |
parent | 96d8009fb2921d4fc3152a89b97a888bd7e6f166 (diff) |
Remove Apache based implementation
Diffstat (limited to 'vespa-feed-client')
6 files changed, 1 insertions, 387 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 19130b52268..feef54ae589 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -25,11 +25,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.httpcomponents.client5</groupId> - <artifactId>httpclient5</artifactId> - <scope>compile</scope> - </dependency> - <dependency> <groupId>org.eclipse.jetty.http2</groupId> <artifactId>http2-http-client-transport</artifactId> <scope>compile</scope> @@ -80,21 +75,6 @@ <showDeprecation>true</showDeprecation> </configuration> </execution> - <execution> - <id>compile-java-9</id> - <phase>compile</phase> - <goals> - <goal>compile</goal> - </goals> - <configuration> - <release>9</release> - <compileSourceRoots> - <compileSourceRoot>${project.basedir}/src/main/java9</compileSourceRoot> - </compileSourceRoots> - <outputDirectory>${project.build.outputDirectory}/META-INF/versions/9</outputDirectory> - <showDeprecation>true</showDeprecation> - </configuration> - </execution> </executions> </plugin> <plugin> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java deleted file mode 100644 index 96c65a6b165..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.impl; - -import ai.vespa.feed.client.FeedClientBuilder.Compression; -import ai.vespa.feed.client.HttpResponse; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.client5.http.config.ConnectionConfig; -import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.HttpAsyncClients; -import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; -import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.message.BasicHeader; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.net.URIAuthority; -import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.util.Timeout; - -import javax.net.ssl.SSLContext; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.zip.GZIPOutputStream; - -import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; -import static ai.vespa.feed.client.FeedClientBuilder.Compression.gzip; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; - -/** - * @author jonmv - */ -class ApacheCluster implements Cluster { - - private final List<Endpoint> endpoints = new ArrayList<>(); - private final List<BasicHeader> defaultHeaders = Arrays.asList(new BasicHeader(HttpHeaders.USER_AGENT, String.format("vespa-feed-client/%s (Apache)", Vespa.VERSION)), - new BasicHeader("Vespa-Client-Version", Vespa.VERSION)); - private final Header gzipEncodingHeader = new BasicHeader(HttpHeaders.CONTENT_ENCODING, "gzip"); - private final Compression compression; - private int someNumber = 0; - - private final ExecutorService dispatchExecutor = Executors.newFixedThreadPool(8, t -> new Thread(t, "request-dispatch-thread")); - private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread")); - - ApacheCluster(FeedClientBuilderImpl builder) throws IOException { - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - for (URI endpoint : builder.endpoints) - endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); - this.compression = builder.compression; - } - - @Override - public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) { - Endpoint leastBusy = endpoints.get(0); - int min = Integer.MAX_VALUE; - int start = ++someNumber % endpoints.size(); - for (int i = 0; i < endpoints.size(); i++) { - Endpoint endpoint = endpoints.get((i + start) % endpoints.size()); - int inflight = endpoint.inflight.get(); - if (inflight < min) { - leastBusy = endpoint; - min = inflight; - } - } - Endpoint endpoint = leastBusy; - endpoint.inflight.incrementAndGet(); - - dispatchExecutor.execute(() -> { - try { - SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); - request.setScheme(endpoint.url.getScheme()); - request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url))); - request.setConfig(RequestConfig.custom().setConnectionRequestTimeout(Timeout.DISABLED).build()); - defaultHeaders.forEach(request::setHeader); - wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); - if (wrapped.body() != null) { - byte[] body = wrapped.body(); - if (compression == gzip || compression == auto && body.length > 512) { - request.setHeader(gzipEncodingHeader); - body = gzipped(body); - } - request.setBody(body, ContentType.APPLICATION_JSON); - } - - Future<?> future = endpoint.client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); - // Manually schedule response timeout as the Apache HTTP/2 multiplexing client does not support response timeouts - long timeoutMillis = wrapped.timeout() == null ? 190_000 : wrapped.timeout().toMillis(); - Future<?> cancellation = timeoutExecutor.schedule( - () -> { - vessel.completeExceptionally( - new TimeoutException(String.format("Request timed out after %dms", timeoutMillis))); - future.cancel(true); - }, - timeoutMillis * 11 / 10 + 1_000, TimeUnit.MILLISECONDS); - vessel.whenComplete((__, ___) -> cancellation.cancel(true)); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); - }); - } - - private byte[] gzipped(byte[] content) throws IOException{ - ByteArrayOutputStream buffer = new ByteArrayOutputStream(1 << 10); - try (GZIPOutputStream zip = new GZIPOutputStream(buffer)) { - zip.write(content); - } - return buffer.toByteArray(); - } - - @Override - public void close() { - Throwable thrown = null; - dispatchExecutor.shutdownNow().forEach(Runnable::run); - for (Endpoint endpoint : endpoints) { - try { - endpoint.client.close(); - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - } - timeoutExecutor.shutdownNow().forEach(Runnable::run); - if (thrown != null) throw new RuntimeException(thrown); - } - - - private static class Endpoint { - - private final CloseableHttpAsyncClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI url; - - private Endpoint(CloseableHttpAsyncClient client, URI url) { - this.client = client; - this.url = url; - - this.client.start(); - } - - } - - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilderImpl builder) throws IOException { - SSLContext sslContext = builder.constructSslContext(); - String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); - if (allowedCiphers.length == 0) - throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); - - ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() - .setCiphers(allowedCiphers) - .setSslContext(sslContext); - if (builder.hostnameVerifier != null) - tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); - - // Socket timeout must be longer than the longest feasible response timeout - Timeout socketTimeout = Timeout.ofMinutes(15); - - ConnectionConfig connCfg = ConnectionConfig.custom() - .setSocketTimeout(socketTimeout) - .setConnectTimeout(Timeout.ofSeconds(10)) - .build(); - - return HttpAsyncClients.customHttp2() - .setH2Config( - H2Config.custom() - .setMaxConcurrentStreams(builder.maxStreamsPerConnection) - .setCompressionEnabled(true) - .setPushEnabled(false) - .setInitialWindowSize(Integer.MAX_VALUE) - .build()) - .setIOReactorConfig( - IOReactorConfig.custom() - .setIoThreadCount(Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 8), 2)) - .setTcpNoDelay(true) - .setSoTimeout(socketTimeout) - .build()) - .setTlsStrategy(tlsStrategyBuilder.build()) - .setDefaultConnectionConfig(connCfg) - .disableAutomaticRetries() - .disableRedirectHandling() - .disableCookieManagement() - .build(); - } - - private static int portOf(URI url) { - return url.getPort() == -1 ? url.getScheme().equals("http") ? 80 : 443 - : url.getPort(); - } - - private static class ApacheHttpResponse implements HttpResponse { - - private final SimpleHttpResponse wrapped; - - private ApacheHttpResponse(SimpleHttpResponse wrapped) { - this.wrapped = wrapped; - } - - @Override - public int code() { - return wrapped.getCode(); - } - - @Override - public byte[] body() { - return wrapped.getBodyBytes(); - } - - @Override - public String contentType() { - return wrapped.getContentType().getMimeType(); - } - - @Override - public String toString() { - return "HTTP response with code " + code() + - (body() != null ? " and body '" + wrapped.getBodyText() + "'" : ""); - } - - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index f228717eba5..40c5fda8ce3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -55,8 +55,7 @@ class HttpFeedClient implements FeedClient { private final boolean speedTest; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, builder.dryrun ? - new DryrunCluster() : experimentalClientEnabled() ? new JettyCluster(builder) : new ApacheCluster(builder)); + this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); } HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { @@ -315,13 +314,4 @@ class HttpFeedClient implements FeedClient { return query.toString(); } - private static boolean experimentalClientEnabled() { - String name = "VESPA_FEED_EXPERIMENTAL_CLIENT"; - return Optional.ofNullable(System.getenv(name)) - .map(Boolean::parseBoolean) - .orElse(Optional.ofNullable(System.getProperty(name)) - .map(Boolean::parseBoolean) - .orElse(true)); - } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java deleted file mode 100644 index 5183ce61761..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/TlsDetailsFactory.java +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.impl; - -import org.apache.hc.core5.reactor.ssl.TlsDetails; - -import javax.net.ssl.SSLEngine; - -/** - * @author bjorncs - */ -public class TlsDetailsFactory { - private TlsDetailsFactory() {} - - public static TlsDetails create(SSLEngine e) { return new TlsDetails(e.getSession(), "h2"); /*h2 == HTTP2*/ } -} - diff --git a/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java b/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java deleted file mode 100644 index f9903d9943d..00000000000 --- a/vespa-feed-client/src/main/java9/ai/vespa/feed/client/impl/TlsDetailsFactory.java +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.impl; - -import org.apache.hc.core5.reactor.ssl.TlsDetails; - -import javax.net.ssl.SSLEngine; - -/** - * {@link SSLEngine#getApplicationProtocol()} is not available on all JDK8 versions - * (https://bugs.openjdk.org/browse/JDK-8051498) - * - * @author bjorncs - */ -public class TlsDetailsFactory { - private TlsDetailsFactory() {} - - public static TlsDetails create(SSLEngine e) { - return new TlsDetails(e.getSession(), e.getApplicationProtocol()); - } -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java deleted file mode 100644 index cf9a36f2aa8..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/ApacheClusterTest.java +++ /dev/null @@ -1,77 +0,0 @@ -package ai.vespa.feed.client.impl; - -import ai.vespa.feed.client.FeedClientBuilder.Compression; -import ai.vespa.feed.client.HttpResponse; -import com.github.tomakehurst.wiremock.matching.RequestPatternBuilder; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.URI; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.zip.GZIPOutputStream; - -import static com.github.tomakehurst.wiremock.client.WireMock.any; -import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.okJson; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class ApacheClusterTest { - - @RegisterExtension - final WireMockExtension server = new WireMockExtension(); - - @Test - void testClient() throws Exception { - for (Compression compression : Compression.values()) { - try (ApacheCluster cluster = new ApacheCluster(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:" + server.port()))) - .setCompression(compression))) { - server.stubFor(any(anyUrl())) - .setResponse(okJson("{}").build()); - - CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); - cluster.dispatch(new HttpRequest("POST", - "/path", - Map.of("name1", () -> "value1", - "name2", () -> "value2"), - "content".getBytes(UTF_8), - Duration.ofSeconds(10)), - vessel); - - AutoCloseable verifyResponse = () -> { - HttpResponse response = vessel.get(15, TimeUnit.SECONDS); - assertEquals("{}", new String(response.body(), UTF_8)); - assertEquals(200, response.code()); - }; - AutoCloseable verifyServer = () -> { - server.verify(1, anyRequestedFor(anyUrl())); - RequestPatternBuilder expected = postRequestedFor(urlEqualTo("/path")).withHeader("name1", equalTo("value1")) - .withHeader("name2", equalTo("value2")) - .withHeader("Content-Type", equalTo("application/json; charset=UTF-8")) - .withRequestBody(equalTo("content")); - expected = switch (compression) { - case auto, none -> expected.withoutHeader("Content-Encoding"); - case gzip -> expected.withHeader("Content-Encoding", equalTo("gzip")); - }; - server.verify(1, expected); - server.resetRequests(); - }; - try (verifyServer; verifyResponse) { } - } - } - } - -} |