summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-04 08:03:50 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-04 08:03:50 +0200
commitf84d1a7a5f79ba3dc4490ea3cf9c1fafca17786c (patch)
treee152e3caee3527de8110fc9b031f1e95dfe4966c /vespa-feed-client
parent9e0e0c02e5a1034e34dfdeba6220502e9e5678bb (diff)
Extract Cluster interface
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java149
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java125
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java14
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java15
6 files changed, 192 insertions, 131 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
new file mode 100644
index 00000000000..5cbd31ef1a3
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -0,0 +1,18 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+interface Cluster extends Closeable {
+
+ /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. */
+ void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel);
+
+ @Override
+ void close();
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java
new file mode 100644
index 00000000000..7a5427cc7fd
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpCluster.java
@@ -0,0 +1,149 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
+import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.net.URIAuthority;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted;
+import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
+
+/**
+ * @author jonmv
+ */
+class HttpCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ public HttpCluster(FeedClientBuilder builder) throws IOException {
+ for (URI endpoint : builder.endpoints)
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
+ }
+
+ @Override
+ public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ request.setScheme(endpoint.url.getScheme());
+ request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
+ endpoint.client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ endpoint.client.close();
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final CloseableHttpAsyncClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI url;
+
+ private Endpoint(CloseableHttpAsyncClient client, URI url) {
+ this.client = client;
+ this.url = url;
+
+ this.client.start();
+ }
+
+ }
+
+ private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
+ H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
+ .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
+ .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
+ .disableCookieManagement()
+ .disableRedirectHandling()
+ .disableAutomaticRetries()
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setSoTimeout(Timeout.ofSeconds(10))
+ .build())
+ .setDefaultRequestConfig(
+ RequestConfig.custom()
+ .setConnectTimeout(Timeout.ofSeconds(10))
+ .setConnectionRequestTimeout(Timeout.DISABLED)
+ .setResponseTimeout(Timeout.ofMinutes(5))
+ .build())
+ .setH2Config(H2Config.initial()
+ .setMaxConcurrentStreams(builder.maxStreamsPerConnection)
+ .setCompressionEnabled(true)
+ .setPushEnabled(false)
+ .build());
+
+ SSLContext sslContext = constructSslContext(builder);
+ String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
+ if (allowedCiphers.length == 0)
+ throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
+
+ ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create()
+ .setCiphers(allowedCiphers)
+ .setSslContext(sslContext);
+ if (builder.hostnameVerifier != null) {
+ tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier);
+ }
+ return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build())
+ .build();
+ }
+
+ private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException {
+ if (builder.sslContext != null) return builder.sslContext;
+ SslContextBuilder sslContextBuilder = new SslContextBuilder();
+ if (builder.certificate != null && builder.privateKey != null) {
+ sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey);
+ }
+ if (builder.caCertificates != null) {
+ sslContextBuilder.withCaCertificates(builder.caCertificates);
+ }
+ return sslContextBuilder.build();
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 7015c2ac323..516eebec89f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -6,20 +6,9 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
-import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
-import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.net.URIBuilder;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.util.Timeout;
-import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
@@ -27,18 +16,14 @@ import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
/**
* HTTP implementation of {@link FeedClient}
@@ -50,79 +35,17 @@ class HttpFeedClient implements FeedClient {
private final Map<String, Supplier<String>> requestHeaders;
private final RequestStrategy requestStrategy;
- private final List<Endpoint> endpoints = new ArrayList<>();
+ private final Cluster cluster;
private final AtomicBoolean closed = new AtomicBoolean();
HttpFeedClient(FeedClientBuilder builder) throws IOException {
- this.requestHeaders = new HashMap<>(builder.requestHeaders);
- this.requestStrategy = new HttpRequestStrategy(builder);
- for (URI endpoint : builder.endpoints)
- for (int i = 0; i < builder.connectionsPerEndpoint; i++)
- endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
- }
-
- private static class Endpoint {
-
- private final CloseableHttpAsyncClient client;
- private final AtomicInteger inflight = new AtomicInteger(0);
- private final URI url;
-
- private Endpoint(CloseableHttpAsyncClient client, URI url) {
- this.client = client;
- this.url = url;
-
- this.client.start();
- }
-
+ this(builder, new HttpRequestStrategy(builder), new HttpCluster(builder));
}
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
- H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
- .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
- .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
- .disableCookieManagement()
- .disableRedirectHandling()
- .disableAutomaticRetries()
- .setIOReactorConfig(IOReactorConfig.custom()
- .setSoTimeout(Timeout.ofSeconds(10))
- .build())
- .setDefaultRequestConfig(
- RequestConfig.custom()
- .setConnectTimeout(Timeout.ofSeconds(10))
- .setConnectionRequestTimeout(Timeout.DISABLED)
- .setResponseTimeout(Timeout.ofMinutes(5))
- .build())
- .setH2Config(H2Config.initial()
- .setMaxConcurrentStreams(builder.maxStreamsPerConnection)
- .setCompressionEnabled(true)
- .setPushEnabled(false)
- .build());
-
- SSLContext sslContext = constructSslContext(builder);
- String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
- if (allowedCiphers.length == 0)
- throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
-
- ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create()
- .setCiphers(allowedCiphers)
- .setSslContext(sslContext);
- if (builder.hostnameVerifier != null) {
- tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier);
- }
- return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build())
- .build();
- }
-
- private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException {
- if (builder.sslContext != null) return builder.sslContext;
- SslContextBuilder sslContextBuilder = new SslContextBuilder();
- if (builder.certificate != null && builder.privateKey != null) {
- sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey);
- }
- if (builder.caCertificates != null) {
- sslContextBuilder.withCaCertificates(builder.caCertificates);
- }
- return sslContextBuilder.build();
+ HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy, Cluster cluster) {
+ this.requestHeaders = new HashMap<>(builder.requestHeaders);
+ this.requestStrategy = requestStrategy;
+ this.cluster = cluster;
}
@Override
@@ -147,16 +70,7 @@ class HttpFeedClient implements FeedClient {
requestStrategy.await();
requestStrategy.destroy();
- Throwable thrown = null;
- for (Endpoint endpoint : endpoints)
- try {
- endpoint.client.close();
- }
- catch (Throwable t) {
- if (thrown == null) thrown = t;
- else thrown.addSuppressed(t);
- }
- if (thrown != null) throw new RuntimeException(thrown);
+ cluster.close();
}
private void ensureOpen() {
@@ -190,30 +104,7 @@ class HttpFeedClient implements FeedClient {
/** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */
private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
- int index = 0;
- int min = Integer.MAX_VALUE;
- for (int i = 0; i < endpoints.size(); i++)
- if (endpoints.get(i).inflight.get() < min) {
- index = i;
- min = endpoints.get(i).inflight.get();
- }
-
- Endpoint endpoint = endpoints.get(index);
- endpoint.inflight.incrementAndGet();
- try {
- request.setScheme(endpoint.url.getScheme());
- request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
- endpoint.client.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ cluster.dispatch(request, vessel);
}
private static final JsonFactory factory = new JsonFactory();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index a3a29412254..5d1cf80cfef 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -17,11 +17,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
// TODO: update doc
/**
@@ -61,19 +63,17 @@ class HttpRequestStrategy implements RequestStrategy {
private void dispatch() {
try {
- while ( ! destroyed.get()) {
- CircuitBreaker.State state = breaker.state();
- if (state == OPEN) destroy();
- else while ( ! isInExcess())
- if ( ! poll() || breaker.state() == HALF_OPEN) break;
-
+ while (breaker.state() != OPEN) {
+ while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
}
}
catch (InterruptedException e) {
- destroy();
+ Thread.currentThread().interrupt();
+ log.log(WARNING, "Dispatch thread interrupted; shutting down");
}
+ destroy();
}
private void offer(Runnable task) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index c3bb4573fd4..b9beedd1bbb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -13,7 +13,7 @@ import java.util.function.BiConsumer;
*
* @author jonmv
*/
-public interface RequestStrategy {
+interface RequestStrategy {
/** Whether this has failed fatally, and we should cease sending further operations. */
boolean hasFailed();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
index 8db0b8f2d43..28a50b88396 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
@@ -5,20 +5,20 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
import static org.junit.jupiter.api.Assertions.assertEquals;
class JsonStreamFeederTest {
@Test
void test() throws IOException {
- int docs = 1 << 10;
+ int docs = 1 << 14;
String json = "[\n" +
IntStream.range(0, docs).mapToObj(i ->
@@ -28,7 +28,7 @@ class JsonStreamFeederTest {
" \"lul\":\"lal\"\n" +
" }\n" +
" },\n"
- ).collect(Collectors.joining()) +
+ ).collect(joining()) +
" {\n" +
" \"id\": \"id:ns:type::abc" + docs + "\",\n" +
@@ -38,8 +38,10 @@ class JsonStreamFeederTest {
" }\n" +
"]";
ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8));
- Set<String> ids = new ConcurrentSkipListSet<>();
+ Set<String> ids = new HashSet<>();
+ long startNanos = System.nanoTime();
JsonStreamFeeder.builder(new FeedClient() {
+
@Override
public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
ids.add(documentId.userSpecific());
@@ -59,7 +61,8 @@ class JsonStreamFeederTest {
@Override
public void close(boolean graceful) { }
- }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6.
+ }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
assertEquals(docs + 1, ids.size());
}