diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-04 08:03:50 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-04 08:03:50 +0200 |
commit | f84d1a7a5f79ba3dc4490ea3cf9c1fafca17786c (patch) | |
tree | e152e3caee3527de8110fc9b031f1e95dfe4966c /vespa-feed-client | |
parent | 9e0e0c02e5a1034e34dfdeba6220502e9e5678bb (diff) |
Extract Cluster interface
Diffstat (limited to 'vespa-feed-client')
6 files changed, 192 insertions, 131 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java new file mode 100644 index 00000000000..5cbd31ef1a3 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; + +interface Cluster extends Closeable { + + /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. */ + void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel); + + @Override + void close(); + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java new file mode 100644 index 00000000000..7a5427cc7fd --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java @@ -0,0 +1,149 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; + +/** + * @author jonmv + */ +class HttpCluster implements Cluster { + + private final List<Endpoint> endpoints = new ArrayList<>(); + + public HttpCluster(FeedClientBuilder builder) throws IOException { + for (URI endpoint : builder.endpoints) + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); + } + + @Override + public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + request.setScheme(endpoint.url.getScheme()); + request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); + endpoint.client.execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.close(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final CloseableHttpAsyncClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI url; + + private Endpoint(CloseableHttpAsyncClient client, URI url) { + this.client = client; + this.url = url; + + this.client.start(); + } + + } + + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { + H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() + .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) + .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) + .disableCookieManagement() + .disableRedirectHandling() + .disableAutomaticRetries() + .setIOReactorConfig(IOReactorConfig.custom() + .setSoTimeout(Timeout.ofSeconds(10)) + .build()) + .setDefaultRequestConfig( + RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(10)) + .setConnectionRequestTimeout(Timeout.DISABLED) + .setResponseTimeout(Timeout.ofMinutes(5)) + .build()) + .setH2Config(H2Config.initial() + .setMaxConcurrentStreams(builder.maxStreamsPerConnection) + .setCompressionEnabled(true) + .setPushEnabled(false) + .build()); + + SSLContext sslContext = constructSslContext(builder); + String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); + if (allowedCiphers.length == 0) + throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); + + ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() + .setCiphers(allowedCiphers) + .setSslContext(sslContext); + if (builder.hostnameVerifier != null) { + tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); + } + return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) + .build(); + } + + private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { + if (builder.sslContext != null) return builder.sslContext; + SslContextBuilder sslContextBuilder = new SslContextBuilder(); + if (builder.certificate != null && builder.privateKey != null) { + sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); + } + if (builder.caCertificates != null) { + sslContextBuilder.withCaCertificates(builder.caCertificates); + } + return sslContextBuilder.build(); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 7015c2ac323..516eebec89f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -6,20 +6,9 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; -import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; -import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.message.BasicHeader; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.net.URIAuthority; import org.apache.hc.core5.net.URIBuilder; -import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.util.Timeout; -import javax.net.ssl.SSLContext; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; @@ -27,18 +16,14 @@ import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; /** * HTTP implementation of {@link FeedClient} @@ -50,79 +35,17 @@ class HttpFeedClient implements FeedClient { private final Map<String, Supplier<String>> requestHeaders; private final RequestStrategy requestStrategy; - private final List<Endpoint> endpoints = new ArrayList<>(); + private final Cluster cluster; private final AtomicBoolean closed = new AtomicBoolean(); HttpFeedClient(FeedClientBuilder builder) throws IOException { - this.requestHeaders = new HashMap<>(builder.requestHeaders); - this.requestStrategy = new HttpRequestStrategy(builder); - for (URI endpoint : builder.endpoints) - for (int i = 0; i < builder.connectionsPerEndpoint; i++) - endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); - } - - private static class Endpoint { - - private final CloseableHttpAsyncClient client; - private final AtomicInteger inflight = new AtomicInteger(0); - private final URI url; - - private Endpoint(CloseableHttpAsyncClient client, URI url) { - this.client = client; - this.url = url; - - this.client.start(); - } - + this(builder, new HttpRequestStrategy(builder), new HttpCluster(builder)); } - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { - H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() - .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) - .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) - .disableCookieManagement() - .disableRedirectHandling() - .disableAutomaticRetries() - .setIOReactorConfig(IOReactorConfig.custom() - .setSoTimeout(Timeout.ofSeconds(10)) - .build()) - .setDefaultRequestConfig( - RequestConfig.custom() - .setConnectTimeout(Timeout.ofSeconds(10)) - .setConnectionRequestTimeout(Timeout.DISABLED) - .setResponseTimeout(Timeout.ofMinutes(5)) - .build()) - .setH2Config(H2Config.initial() - .setMaxConcurrentStreams(builder.maxStreamsPerConnection) - .setCompressionEnabled(true) - .setPushEnabled(false) - .build()); - - SSLContext sslContext = constructSslContext(builder); - String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); - if (allowedCiphers.length == 0) - throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); - - ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() - .setCiphers(allowedCiphers) - .setSslContext(sslContext); - if (builder.hostnameVerifier != null) { - tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); - } - return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) - .build(); - } - - private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { - if (builder.sslContext != null) return builder.sslContext; - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (builder.certificate != null && builder.privateKey != null) { - sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); - } - if (builder.caCertificates != null) { - sslContextBuilder.withCaCertificates(builder.caCertificates); - } - return sslContextBuilder.build(); + HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy, Cluster cluster) { + this.requestHeaders = new HashMap<>(builder.requestHeaders); + this.requestStrategy = requestStrategy; + this.cluster = cluster; } @Override @@ -147,16 +70,7 @@ class HttpFeedClient implements FeedClient { requestStrategy.await(); requestStrategy.destroy(); - Throwable thrown = null; - for (Endpoint endpoint : endpoints) - try { - endpoint.client.close(); - } - catch (Throwable t) { - if (thrown == null) thrown = t; - else thrown.addSuppressed(t); - } - if (thrown != null) throw new RuntimeException(thrown); + cluster.close(); } private void ensureOpen() { @@ -190,30 +104,7 @@ class HttpFeedClient implements FeedClient { /** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */ private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < endpoints.size(); i++) - if (endpoints.get(i).inflight.get() < min) { - index = i; - min = endpoints.get(i).inflight.get(); - } - - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); - try { - request.setScheme(endpoint.url.getScheme()); - request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); - endpoint.client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + cluster.dispatch(request, vessel); } private static final JsonFactory factory = new JsonFactory(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index a3a29412254..5d1cf80cfef 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -17,11 +17,13 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.logging.Logger; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.logging.Level.FINE; +import static java.util.logging.Level.WARNING; // TODO: update doc /** @@ -61,19 +63,17 @@ class HttpRequestStrategy implements RequestStrategy { private void dispatch() { try { - while ( ! destroyed.get()) { - CircuitBreaker.State state = breaker.state(); - if (state == OPEN) destroy(); - else while ( ! isInExcess()) - if ( ! poll() || breaker.state() == HALF_OPEN) break; - + while (breaker.state() != OPEN) { + while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } catch (InterruptedException e) { - destroy(); + Thread.currentThread().interrupt(); + log.log(WARNING, "Dispatch thread interrupted; shutting down"); } + destroy(); } private void offer(Runnable task) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index c3bb4573fd4..b9beedd1bbb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -13,7 +13,7 @@ import java.util.function.BiConsumer; * * @author jonmv */ -public interface RequestStrategy { +interface RequestStrategy { /** Whether this has failed fatally, and we should cease sending further operations. */ boolean hasFailed(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java index 8db0b8f2d43..28a50b88396 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -5,20 +5,20 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.stream.Collectors; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; class JsonStreamFeederTest { @Test void test() throws IOException { - int docs = 1 << 10; + int docs = 1 << 14; String json = "[\n" + IntStream.range(0, docs).mapToObj(i -> @@ -28,7 +28,7 @@ class JsonStreamFeederTest { " \"lul\":\"lal\"\n" + " }\n" + " },\n" - ).collect(Collectors.joining()) + + ).collect(joining()) + " {\n" + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + @@ -38,8 +38,10 @@ class JsonStreamFeederTest { " }\n" + "]"; ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set<String> ids = new ConcurrentSkipListSet<>(); + Set<String> ids = new HashSet<>(); + long startNanos = System.nanoTime(); JsonStreamFeeder.builder(new FeedClient() { + @Override public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { ids.add(documentId.userSpecific()); @@ -59,7 +61,8 @@ class JsonStreamFeederTest { @Override public void close(boolean graceful) { } - }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6. + }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); } |