summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/abi-spec.json386
-rw-r--r--vespa-feed-client/pom.xml9
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java165
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java102
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java21
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java86
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java86
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java154
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java39
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java71
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java253
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java42
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java333
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java484
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java364
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java96
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java22
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java14
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java30
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java45
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java9
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java60
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java101
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java203
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java124
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java67
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java92
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java117
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java34
34 files changed, 2861 insertions, 785 deletions
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json
new file mode 100644
index 00000000000..70cb4c3f09f
--- /dev/null
+++ b/vespa-feed-client/abi-spec.json
@@ -0,0 +1,386 @@
+{
+ "ai.vespa.feed.client.BenchmarkingCluster": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "ai.vespa.feed.client.Cluster"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(ai.vespa.feed.client.Cluster)",
+ "public void dispatch(ai.vespa.feed.client.HttpRequest, java.util.concurrent.CompletableFuture)",
+ "public ai.vespa.feed.client.OperationStats stats()",
+ "public void close()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.DocumentId": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, java.lang.String)",
+ "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, long, java.lang.String)",
+ "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, java.lang.String, java.lang.String)",
+ "public static ai.vespa.feed.client.DocumentId of(java.lang.String)",
+ "public java.lang.String documentType()",
+ "public java.lang.String namespace()",
+ "public java.util.OptionalLong number()",
+ "public java.util.Optional group()",
+ "public java.lang.String userSpecific()",
+ "public boolean equals(java.lang.Object)",
+ "public int hashCode()",
+ "public java.lang.String toString()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.DynamicThrottler": {
+ "superClass": "ai.vespa.feed.client.StaticThrottler",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(ai.vespa.feed.client.FeedClientBuilder)",
+ "public void sent(long, java.util.concurrent.CompletableFuture)",
+ "public void success()",
+ "public void throttled(long)",
+ "public long targetInflight()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedClient$CircuitBreaker$State": {
+ "superClass": "java.lang.Enum",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "enum"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.FeedClient$CircuitBreaker$State[] values()",
+ "public static ai.vespa.feed.client.FeedClient$CircuitBreaker$State valueOf(java.lang.String)"
+ ],
+ "fields": [
+ "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State CLOSED",
+ "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State HALF_OPEN",
+ "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State OPEN"
+ ]
+ },
+ "ai.vespa.feed.client.FeedClient$CircuitBreaker": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void success()",
+ "public abstract void failure()",
+ "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedClient$OperationType": {
+ "superClass": "java.lang.Enum",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "enum"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.FeedClient$OperationType[] values()",
+ "public static ai.vespa.feed.client.FeedClient$OperationType valueOf(java.lang.String)"
+ ],
+ "fields": [
+ "public static final enum ai.vespa.feed.client.FeedClient$OperationType PUT",
+ "public static final enum ai.vespa.feed.client.FeedClient$OperationType UPDATE",
+ "public static final enum ai.vespa.feed.client.FeedClient$OperationType REMOVE"
+ ]
+ },
+ "ai.vespa.feed.client.FeedClient$RetryStrategy": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)",
+ "public int retries()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedClient$Throttler": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract void sent(long, java.util.concurrent.CompletableFuture)",
+ "public abstract void success()",
+ "public abstract void throttled(long)",
+ "public abstract long targetInflight()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedClient": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "java.io.Closeable"
+ ],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public abstract java.util.concurrent.CompletableFuture put(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
+ "public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
+ "public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)",
+ "public abstract ai.vespa.feed.client.OperationStats stats()",
+ "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()",
+ "public abstract void close(boolean)",
+ "public void close()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedClientBuilder": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.FeedClientBuilder create(java.net.URI)",
+ "public static ai.vespa.feed.client.FeedClientBuilder create(java.util.List)",
+ "public ai.vespa.feed.client.FeedClientBuilder setConnectionsPerEndpoint(int)",
+ "public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)",
+ "public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)",
+ "public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)",
+ "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)",
+ "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)",
+ "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)",
+ "public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCircuitBreaker(ai.vespa.feed.client.FeedClient$CircuitBreaker)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.nio.file.Path, java.nio.file.Path)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.util.Collection, java.security.PrivateKey)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.security.cert.X509Certificate, java.security.PrivateKey)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCaCertificatesFile(java.nio.file.Path)",
+ "public ai.vespa.feed.client.FeedClientBuilder setCaCertificates(java.util.Collection)",
+ "public ai.vespa.feed.client.FeedClient build()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.FeedException": {
+ "superClass": "java.lang.RuntimeException",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.lang.String)",
+ "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String)",
+ "public void <init>(java.lang.String, java.lang.Throwable)",
+ "public void <init>(java.lang.Throwable)",
+ "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)",
+ "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)",
+ "public java.util.Optional documentId()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.GracePeriodCircuitBreaker": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "ai.vespa.feed.client.FeedClient$CircuitBreaker"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.time.Duration, java.time.Duration)",
+ "public void success()",
+ "public void failure()",
+ "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.JsonFeeder$Builder": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public ai.vespa.feed.client.JsonFeeder$Builder withTimeout(java.time.Duration)",
+ "public ai.vespa.feed.client.JsonFeeder$Builder withRoute(java.lang.String)",
+ "public ai.vespa.feed.client.JsonFeeder$Builder withTracelevel(int)",
+ "public ai.vespa.feed.client.JsonFeeder build()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.JsonFeeder$ResultCallback": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "interface",
+ "abstract"
+ ],
+ "methods": [
+ "public void onNextResult(ai.vespa.feed.client.Result, ai.vespa.feed.client.FeedException)",
+ "public void onError(ai.vespa.feed.client.FeedException)",
+ "public void onComplete()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.JsonFeeder": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "java.io.Closeable"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.JsonFeeder$Builder builder(ai.vespa.feed.client.FeedClient)",
+ "public java.util.concurrent.CompletableFuture feedSingle(java.lang.String)",
+ "public java.util.concurrent.CompletableFuture feedMany(java.io.InputStream, ai.vespa.feed.client.JsonFeeder$ResultCallback)",
+ "public java.util.concurrent.CompletableFuture feedMany(java.io.InputStream)",
+ "public void close()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.OperationParameters": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.OperationParameters empty()",
+ "public ai.vespa.feed.client.OperationParameters createIfNonExistent(boolean)",
+ "public ai.vespa.feed.client.OperationParameters testAndSetCondition(java.lang.String)",
+ "public ai.vespa.feed.client.OperationParameters timeout(java.time.Duration)",
+ "public ai.vespa.feed.client.OperationParameters route(java.lang.String)",
+ "public ai.vespa.feed.client.OperationParameters tracelevel(int)",
+ "public boolean createIfNonExistent()",
+ "public java.util.Optional testAndSetCondition()",
+ "public java.util.Optional timeout()",
+ "public java.util.Optional route()",
+ "public java.util.OptionalInt tracelevel()",
+ "public boolean equals(java.lang.Object)",
+ "public int hashCode()",
+ "public java.lang.String toString()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.OperationParseException": {
+ "superClass": "ai.vespa.feed.client.FeedException",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.lang.String)",
+ "public void <init>(java.lang.String, java.lang.Throwable)"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.OperationStats": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(long, java.util.Map, long, long, long, long, long, long, long)",
+ "public long requests()",
+ "public long responses()",
+ "public long successes()",
+ "public java.util.Map responsesByCode()",
+ "public long exceptions()",
+ "public long inflight()",
+ "public long averageLatencyMillis()",
+ "public long minLatencyMillis()",
+ "public long maxLatencyMillis()",
+ "public long bytesSent()",
+ "public long bytesReceived()",
+ "public java.lang.String toString()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.Result$Type": {
+ "superClass": "java.lang.Enum",
+ "interfaces": [],
+ "attributes": [
+ "public",
+ "final",
+ "enum"
+ ],
+ "methods": [
+ "public static ai.vespa.feed.client.Result$Type[] values()",
+ "public static ai.vespa.feed.client.Result$Type valueOf(java.lang.String)"
+ ],
+ "fields": [
+ "public static final enum ai.vespa.feed.client.Result$Type success",
+ "public static final enum ai.vespa.feed.client.Result$Type conditionNotMet",
+ "public static final enum ai.vespa.feed.client.Result$Type failure"
+ ]
+ },
+ "ai.vespa.feed.client.Result": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public ai.vespa.feed.client.Result$Type type()",
+ "public ai.vespa.feed.client.DocumentId documentId()",
+ "public java.util.Optional resultMessage()",
+ "public java.util.Optional traceMessage()"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.ResultParseException": {
+ "superClass": "ai.vespa.feed.client.FeedException",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String)",
+ "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)"
+ ],
+ "fields": []
+ },
+ "ai.vespa.feed.client.StaticThrottler": {
+ "superClass": "java.lang.Object",
+ "interfaces": [
+ "ai.vespa.feed.client.FeedClient$Throttler"
+ ],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(ai.vespa.feed.client.FeedClientBuilder)",
+ "public void sent(long, java.util.concurrent.CompletableFuture)",
+ "public void success()",
+ "public void throttled(long)",
+ "public long targetInflight()"
+ ],
+ "fields": [
+ "protected final long maxInflight",
+ "protected final long minInflight"
+ ]
+ }
+} \ No newline at end of file
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml
index 7759e9d2308..7d4938c6fb0 100644
--- a/vespa-feed-client/pom.xml
+++ b/vespa-feed-client/pom.xml
@@ -20,6 +20,11 @@
<dependencies>
<!-- compile scope -->
<dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>annotations</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>compile</scope>
@@ -83,6 +88,10 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>abi-check-plugin</artifactId>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
new file mode 100644
index 00000000000..e5d45a2f211
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -0,0 +1,165 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.client5.http.config.RequestConfig;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
+import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.core5.concurrent.FutureCallback;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.net.URIAuthority;
+import org.apache.hc.core5.reactor.IOReactorConfig;
+import org.apache.hc.core5.util.Timeout;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted;
+import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
+
+/**
+ * @author jonmv
+ */
+class ApacheCluster implements Cluster {
+
+ private final List<Endpoint> endpoints = new ArrayList<>();
+
+ ApacheCluster(FeedClientBuilder builder) throws IOException {
+ for (URI endpoint : builder.endpoints)
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
+ }
+
+ @Override
+ public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) {
+ SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
+ wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
+ if (wrapped.body() != null)
+ request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
+
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
+ index = i;
+ min = endpoints.get(i).inflight.get();
+ }
+
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
+ try {
+ request.setScheme(endpoint.url.getScheme());
+ request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
+ endpoint.client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ }
+
+ @Override
+ public void close() {
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ endpoint.client.close();
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+
+ private static class Endpoint {
+
+ private final CloseableHttpAsyncClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI url;
+
+ private Endpoint(CloseableHttpAsyncClient client, URI url) {
+ this.client = client;
+ this.url = url;
+
+ this.client.start();
+ }
+
+ }
+
+ private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
+ H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
+ .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
+ .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
+ .disableCookieManagement()
+ .disableRedirectHandling()
+ .disableAutomaticRetries()
+ .setIOReactorConfig(IOReactorConfig.custom()
+ .setIoThreadCount(2)
+ .setTcpNoDelay(true)
+ .setSoTimeout(Timeout.ofSeconds(10))
+ .build())
+ .setDefaultRequestConfig(RequestConfig.custom()
+ .setConnectTimeout(Timeout.ofSeconds(10))
+ .setConnectionRequestTimeout(Timeout.DISABLED)
+ .setResponseTimeout(Timeout.ofMinutes(5))
+ .build())
+ .setH2Config(H2Config.custom()
+ .setMaxConcurrentStreams(builder.maxStreamsPerConnection)
+ .setCompressionEnabled(true)
+ .setPushEnabled(false)
+ .setInitialWindowSize(Integer.MAX_VALUE)
+ .build());
+
+ SSLContext sslContext = builder.constructSslContext();
+ String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
+ if (allowedCiphers.length == 0)
+ throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
+
+ ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create()
+ .setCiphers(allowedCiphers)
+ .setSslContext(sslContext);
+ if (builder.hostnameVerifier != null) {
+ tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier);
+ }
+ return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build())
+ .build();
+ }
+
+ private static class ApacheHttpResponse implements HttpResponse {
+
+ private final SimpleHttpResponse wrapped;
+
+ private ApacheHttpResponse(SimpleHttpResponse wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public int code() {
+ return wrapped.getCode();
+ }
+
+ @Override
+ public byte[] body() {
+ return wrapped.getBodyBytes();
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
new file mode 100644
index 00000000000..840219a6bf1
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
@@ -0,0 +1,102 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+public class BenchmarkingCluster implements Cluster {
+
+ private final Cluster delegate;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable, "cluster-stats-collector");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ private final AtomicLong requests = new AtomicLong();
+ private long results = 0;
+ private long responses = 0;
+ private final long[] responsesByCode = new long[600];
+ private long exceptions = 0;
+ private long totalLatencyMillis = 0;
+ private long minLatencyMillis = Long.MAX_VALUE;
+ private long maxLatencyMillis = 0;
+ private long bytesSent = 0;
+ private long bytesReceived = 0;
+
+ public BenchmarkingCluster(Cluster delegate) {
+ this.delegate = requireNonNull(delegate);
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ requests.incrementAndGet();
+ long startNanos = System.nanoTime();
+ delegate.dispatch(request, vessel);
+ vessel.whenCompleteAsync((response, thrown) -> {
+ results++;
+ if (thrown == null) {
+ responses++;
+ responsesByCode[response.code()]++;
+ long latency = (System.nanoTime() - startNanos) / 1_000_000;
+ totalLatencyMillis += latency;
+ minLatencyMillis = Math.min(minLatencyMillis, latency);
+ maxLatencyMillis = Math.max(maxLatencyMillis, latency);
+ bytesSent += request.body() == null ? 0 : request.body().length;
+ bytesReceived += response.body() == null ? 0 : response.body().length;
+ }
+ else
+ exceptions++;
+ },
+ executor);
+ }
+
+ @Override
+ public OperationStats stats() {
+ try {
+ try {
+ return executor.submit(this::getStats).get();
+ }
+ catch (RejectedExecutionException ignored) {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ return getStats();
+ }
+ }
+ catch (InterruptedException | ExecutionException ignored) {
+ throw new RuntimeException(ignored);
+ }
+ }
+
+ private OperationStats getStats() {
+ Map<Integer, Long> responses = new HashMap<>();
+ for (int code = 0; code < responsesByCode.length; code++)
+ if (responsesByCode[code] > 0)
+ responses.put(code, responsesByCode[code]);
+
+ return new OperationStats(requests.get(),
+ responses,
+ exceptions,
+ requests.get() - results,
+ this.responses == 0 ? 0 : totalLatencyMillis / this.responses,
+ minLatencyMillis,
+ maxLatencyMillis,
+ bytesSent,
+ bytesReceived);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ executor.shutdown();
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
new file mode 100644
index 00000000000..f428fb567e6
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -0,0 +1,21 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Allows dispatch to a Vespa cluster.
+ */
+interface Cluster extends Closeable {
+
+ /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */
+ void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel);
+
+ @Override
+ default void close() { }
+
+ default OperationStats stats() { return new OperationStats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java
index 21513a5dac2..39fc9fb28e0 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java
@@ -8,6 +8,8 @@ import java.util.OptionalLong;
import static java.util.Objects.requireNonNull;
/**
+ * Represents a Vespa document id
+ *
* @author jonmv
*/
public class DocumentId {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
new file mode 100644
index 00000000000..6f4e4e752f0
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
@@ -0,0 +1,86 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.Math.log;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.Math.pow;
+import static java.lang.Math.random;
+
+/**
+ * Samples latency as a function of inflight requests, and regularly adjusts to the optimal value.
+ *
+ * @author jonmv
+ */
+public class DynamicThrottler extends StaticThrottler {
+
+ private final AtomicLong targetInflight;
+ private long updateNanos = 0;
+ private final List<AtomicLong> latencies = new ArrayList<>();
+ private final double weight = 0.9; // Higher weight favours higher (own) throughput, at the cost of (shared) latency.
+
+ public DynamicThrottler(FeedClientBuilder builder) {
+ super(builder);
+ this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size());
+ for (int i = 0; i < 128; i++)
+ latencies.add(new AtomicLong(-1));
+ }
+
+ @Override
+ public void sent(long inflight, CompletableFuture<HttpResponse> vessel) {
+ long startNanos = System.nanoTime();
+ if (updateNanos == 0) updateNanos = System.nanoTime();
+ boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second.
+ if (update) updateNanos = startNanos;
+
+ vessel.whenComplete((response, thrown) -> {
+ // Use buckets for latency measurements, with inflight along a log scale,
+ // and with minInflight and maxInflight at the ends.
+ int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight))
+ / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ long nowNanos = System.nanoTime();
+ long latencyNanos = nowNanos - startNanos;
+ latencies.get(index).set(latencyNanos);
+ if ( ! update)
+ return;
+
+ // Loop over latency measurements and pick the one which optimises throughput and latency.
+ double choice = -1;
+ double max = -1;
+ for (int i = latencies.size(); i-- > 0; ) {
+ double latency = latencies.get(i).get();
+ if (latency < 0) continue; // Skip unknown values.
+ double target = minInflight * pow(256, (i + 0.5) / latencies.size());
+ double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight).
+ if (objective > max) {
+ max = objective;
+ choice = target;
+ }
+ }
+ long target = (long) ((random() * 0.25 + 0.90) * choice); // Random walk, skewed towards increase.
+ targetInflight.set(max(minInflight, min(maxInflight, target)));
+ });
+ }
+
+ @Override
+ public void success() {
+ super.success();
+ }
+
+ @Override
+ public void throttled(long inflight) {
+ super.throttled(inflight);
+ }
+
+ @Override
+ public long targetInflight() {
+ return min(super.targetInflight(), targetInflight.get());
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index 455a79060ee..f39b56ad50f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -5,15 +5,43 @@ import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
/**
+ * Asynchronous feed client accepting document operations as JSON
+ *
* @author bjorncs
* @author jonmv
*/
public interface FeedClient extends Closeable {
+ /**
+ * Send a document put with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ * */
CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
+
+ /**
+ * Send a document update with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ * */
CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
+
+ /** Send a document remove with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ * */
CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+ /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
+ OperationStats stats();
+
+ /** Current state of the circuit breaker. */
+ default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; }
+
+ /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
+ void close(boolean graceful);
+
+ /** Initiates graceful shutdown. See {@link #close(boolean)}. */
+ default void close() { close(true); }
+
+ /** Controls what to retry, and how many times. */
interface RetryStrategy {
/** Whether to retry operations of the given type. */
@@ -24,10 +52,62 @@ public interface FeedClient extends Closeable {
}
+ /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
+ interface CircuitBreaker {
+
+ /** Called by the client whenever a successful response is obtained. */
+ void success();
+
+ /** Called by the client whenever a transient or fatal error occurs. */
+ void failure();
+
+ /** The current state of the circuit breaker. */
+ State state();
+
+ enum State {
+
+ /** Circuit is closed: business as usual. */
+ CLOSED,
+
+ /** Circuit is half-open: something is wrong, perhaps it recovers? */
+ HALF_OPEN,
+
+ /** Circuit is open: we have given up. */
+ OPEN;
+
+ }
+
+ }
+
enum OperationType {
- put,
- update,
- remove;
+
+ /** A document put operation. This is idempotent. */
+ PUT,
+
+ /** A document update operation. This is idempotent if all its contained updates are. */
+ UPDATE,
+
+ /** A document remove operation. This is idempotent. */
+ REMOVE;
+
+ }
+
+
+ /** Determines the number of requests to have inflight at any point. */
+ interface Throttler {
+
+ /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */
+ void sent(long inflight, CompletableFuture<HttpResponse> vessel);
+
+ /** A successful response was obtained. */
+ void success();
+
+ /** A throttle signal was obtained from the server. */
+ void throttled(long inflight);
+
+ /** The target inflight operations right now. */
+ long targetInflight();
+
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index eaf84c67ac4..0f685ec5b7f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -7,8 +7,14 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
-import java.time.Clock;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -22,38 +28,51 @@ import static java.util.Objects.requireNonNull;
*/
public class FeedClientBuilder {
- FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { };
+ static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { };
- final URI endpoint;
+ final List<URI> endpoints;
final Map<String, Supplier<String>> requestHeaders = new HashMap<>();
SSLContext sslContext;
HostnameVerifier hostnameVerifier;
- int maxConnections = 4;
- int maxStreamsPerConnection = 1024;
+ int connectionsPerEndpoint = 4;
+ int maxStreamsPerConnection = 4096;
FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy;
- Path certificate;
- Path privateKey;
- Path caCertificates;
- Clock clock;
+ FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10));
+ Path certificateFile;
+ Path privateKeyFile;
+ Path caCertificatesFile;
+ Collection<X509Certificate> certificate;
+ PrivateKey privateKey;
+ Collection<X509Certificate> caCertificates;
+ boolean benchmark;
- public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); }
+ /** Creates a builder for a single container endpoint **/
+ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
- private FeedClientBuilder(URI endpoint) {
- requireNonNull(endpoint.getHost());
- this.endpoint = endpoint;
+ /** Creates a builder for multiple container endpoints **/
+ public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); }
+
+ private FeedClientBuilder(List<URI> endpoints) {
+ if (endpoints.isEmpty())
+ throw new IllegalArgumentException("At least one endpoint must be provided");
+
+ for (URI endpoint : endpoints)
+ requireNonNull(endpoint.getHost());
+
+ this.endpoints = new ArrayList<>(endpoints);
}
/**
- * Sets the maximum number of connections this client will use.
+ * Sets the number of connections this client will use per endpoint.
*
* A reasonable value here is a small multiple of the numbers of containers in the
* cluster to feed, so load can be balanced across these.
* In general, this value should be kept as low as possible, but poor connectivity
* between feeder and cluster may also warrant a higher number of connections.
*/
- public FeedClientBuilder setMaxConnections(int max) {
+ public FeedClientBuilder setConnectionsPerEndpoint(int max) {
if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max);
- this.maxConnections = max;
+ this.connectionsPerEndpoint = max;
return this;
}
@@ -70,52 +89,137 @@ public class FeedClientBuilder {
return this;
}
+ /** Sets {@link SSLContext} instance. */
public FeedClientBuilder setSslContext(SSLContext context) {
- if (certificate != null || caCertificates != null || privateKey != null) {
- throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
- }
this.sslContext = requireNonNull(context);
return this;
}
+ /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */
public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) {
this.hostnameVerifier = requireNonNull(verifier);
return this;
}
+ /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */
+ public FeedClientBuilder setBenchmarkOn(boolean on) {
+ this.benchmark = on;
+ return this;
+ }
+
+ /** Adds HTTP request header to all client requests. */
public FeedClientBuilder addRequestHeader(String name, String value) {
return addRequestHeader(name, () -> requireNonNull(value));
}
+ /**
+ * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request,
+ * i.e. value can be dynamically updated during a feed.
+ */
public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) {
this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier));
return this;
}
+ /**
+ * Overrides default retry strategy.
+ * @see FeedClient.RetryStrategy
+ */
public FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy) {
this.retryStrategy = requireNonNull(strategy);
return this;
}
+ /**
+ * Overrides default circuit breaker.
+ * @see FeedClient.CircuitBreaker
+ */
+ public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) {
+ this.circuitBreaker = requireNonNull(breaker);
+ return this;
+ }
+
+ /** Sets path to client SSL certificate/key PEM files */
public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate");
- this.certificate = certificatePemFile;
- this.privateKey = privateKeyPemFile;
+ this.certificateFile = certificatePemFile;
+ this.privateKeyFile = privateKeyPemFile;
+ return this;
+ }
+
+ /** Sets client SSL certificates/key */
+ public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
return this;
}
- public FeedClientBuilder setCaCertificates(Path caCertificatesFile) {
- if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate");
- this.caCertificates = caCertificatesFile;
+ /** Sets client SSL certificate/key */
+ public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) {
+ return setCertificate(Collections.singletonList(certificate), privateKey);
+ }
+
+ /**
+ * Overrides JVM default SSL truststore
+ * @param caCertificatesFile Path to PEM encoded file containing trusted certificates
+ */
+ public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) {
+ this.caCertificatesFile = caCertificatesFile;
+ return this;
+ }
+
+ /** Overrides JVM default SSL truststore */
+ public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
return this;
}
+ /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
public FeedClient build() {
try {
+ validateConfiguration();
return new HttpFeedClient(this);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
+ SSLContext constructSslContext() throws IOException {
+ if (sslContext != null) return sslContext;
+ SslContextBuilder sslContextBuilder = new SslContextBuilder();
+ if (certificateFile != null && privateKeyFile != null) {
+ sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile);
+ } else if (certificate != null && privateKey != null) {
+ sslContextBuilder.withCertificateAndKey(certificate, privateKey);
+ }
+ if (caCertificatesFile != null) {
+ sslContextBuilder.withCaCertificates(caCertificatesFile);
+ } else if (caCertificates != null) {
+ sslContextBuilder.withCaCertificates(caCertificates);
+ }
+ return sslContextBuilder.build();
+ }
+
+ private void validateConfiguration() {
+ if (sslContext != null && (
+ certificateFile != null || caCertificatesFile != null || privateKeyFile != null ||
+ certificate != null || caCertificates != null || privateKey != null)) {
+ throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates");
+ }
+ if (certificate != null && certificateFile != null) {
+ throw new IllegalArgumentException("Cannot set both certificate directly and as file");
+ }
+ if (privateKey != null && privateKeyFile != null) {
+ throw new IllegalArgumentException("Cannot set both private key directly and as file");
+ }
+ if (caCertificates != null && caCertificatesFile != null) {
+ throw new IllegalArgumentException("Cannot set both CA certificates directly and as file");
+ }
+ if (certificate != null && certificate.isEmpty()) {
+ throw new IllegalArgumentException("Certificate cannot be empty");
+ }
+ if (caCertificates != null && caCertificates.isEmpty()) {
+ throw new IllegalArgumentException("CA certificates cannot be empty");
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
index eb31d1aa808..54e11d3a185 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
@@ -1,8 +1,47 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import java.util.Optional;
+
/**
+ * Signals that an error occurred during feeding
+ *
* @author bjorncs
*/
public class FeedException extends RuntimeException {
+
+ private final DocumentId documentId;
+
+ public FeedException(String message) {
+ super(message);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, String message) {
+ super(message);
+ this.documentId = documentId;
+ }
+
+ public FeedException(String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = null;
+ }
+
+ public FeedException(Throwable cause) {
+ super(cause);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, Throwable cause) {
+ super(cause);
+ this.documentId = documentId;
+ }
+
+ public FeedException(DocumentId documentId, String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = documentId;
+ }
+
+ public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
new file mode 100644
index 00000000000..2c5c2dccf19
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
@@ -0,0 +1,71 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import java.util.logging.Logger;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.WARNING;
+
+/**
+ * Breaks the circuit when no successes have been recorded for a specified time.
+ *
+ * @author jonmv
+ */
+public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
+
+ private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
+ private static final long NEVER = 1L << 60;
+
+ private final AtomicLong failingSinceMillis = new AtomicLong(NEVER);
+ private final AtomicBoolean halfOpen = new AtomicBoolean(false);
+ private final AtomicBoolean open = new AtomicBoolean(false);
+ private final LongSupplier clock;
+ private final long graceMillis;
+ private final long doomMillis;
+
+ public GracePeriodCircuitBreaker(Duration grace, Duration doom) {
+ this(System::currentTimeMillis, grace, doom);
+ }
+
+ GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) {
+ if (grace.isNegative())
+ throw new IllegalArgumentException("Grace delay must be non-negative");
+
+ if (doom.isNegative())
+ throw new IllegalArgumentException("Doom delay must be non-negative");
+
+ this.clock = requireNonNull(clock);
+ this.graceMillis = grace.toMillis();
+ this.doomMillis = doom.toMillis();
+ }
+
+ @Override
+ public void success() {
+ failingSinceMillis.set(NEVER);
+ if ( ! open.get() && halfOpen.compareAndSet(true, false))
+ log.log(INFO, "Circuit breaker is now closed");
+ }
+
+ @Override
+ public void failure() {
+ failingSinceMillis.compareAndSet(NEVER, clock.getAsLong());
+ }
+
+ @Override
+ public State state() {
+ long failingMillis = clock.getAsLong() - failingSinceMillis.get();
+ if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
+ log.log(INFO, "Circuit breaker is now half-open");
+
+ if (failingMillis > doomMillis && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open");
+
+ return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 8a38e859ca4..2269c56cde4 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -1,40 +1,22 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.client5.http.config.RequestConfig;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder;
-import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
-import org.apache.hc.core5.concurrent.FutureCallback;
-import org.apache.hc.core5.http.ContentType;
-import org.apache.hc.core5.http.message.BasicHeader;
-import org.apache.hc.core5.http2.config.H2Config;
-import org.apache.hc.core5.net.URIBuilder;
-import org.apache.hc.core5.reactor.IOReactorConfig;
-import org.apache.hc.core5.util.Timeout;
-
-import javax.net.ssl.SSLContext;
-import java.io.ByteArrayOutputStream;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
import java.io.IOException;
-import java.io.PrintStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted;
-import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
/**
* HTTP implementation of {@link FeedClient}
@@ -44,73 +26,19 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
*/
class HttpFeedClient implements FeedClient {
- private final URI endpoint;
+ private static final JsonFactory factory = new JsonFactory();
+
private final Map<String, Supplier<String>> requestHeaders;
private final RequestStrategy requestStrategy;
- private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>();
- private final List<AtomicInteger> inflight = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean();
HttpFeedClient(FeedClientBuilder builder) throws IOException {
- this.endpoint = builder.endpoint;
- this.requestHeaders = new HashMap<>(builder.requestHeaders);
- this.requestStrategy = new HttpRequestStrategy(builder, Clock.systemUTC());
-
- for (int i = 0; i < builder.maxConnections; i++) {
- CloseableHttpAsyncClient client = createHttpClient(builder);
- client.start();
- httpClients.add(client);
- inflight.add(new AtomicInteger());
- }
+ this(builder, new HttpRequestStrategy(builder));
}
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
- H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
- .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
- .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
- .disableCookieManagement()
- .disableRedirectHandling()
- .disableAutomaticRetries()
- .setIOReactorConfig(IOReactorConfig.custom()
- .setSoTimeout(Timeout.ofSeconds(10))
- .build())
- .setDefaultRequestConfig(
- RequestConfig.custom()
- .setConnectTimeout(Timeout.ofSeconds(10))
- .setConnectionRequestTimeout(Timeout.DISABLED)
- .setResponseTimeout(Timeout.ofMinutes(5))
- .build())
- .setH2Config(H2Config.initial()
- .setMaxConcurrentStreams(builder.maxStreamsPerConnection)
- .setCompressionEnabled(true)
- .setPushEnabled(false)
- .build());
-
- SSLContext sslContext = constructSslContext(builder);
- String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
- if (allowedCiphers.length == 0)
- throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM");
-
- ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create()
- .setCiphers(allowedCiphers)
- .setSslContext(sslContext);
- if (builder.hostnameVerifier != null) {
- tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier);
- }
- return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build())
- .build();
- }
-
- private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException {
- if (builder.sslContext != null) return builder.sslContext;
- SslContextBuilder sslContextBuilder = new SslContextBuilder();
- if (builder.certificate != null && builder.privateKey != null) {
- sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey);
- }
- if (builder.caCertificates != null) {
- sslContextBuilder.withCaCertificates(builder.caCertificates);
- }
- return sslContextBuilder.build();
+ HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy) {
+ this.requestHeaders = new HashMap<>(builder.requestHeaders);
+ this.requestStrategy = requestStrategy;
}
@Override
@@ -129,107 +57,122 @@ class HttpFeedClient implements FeedClient {
}
@Override
- public void close() throws IOException {
- if ( ! closed.getAndSet(true))
- for (CloseableHttpAsyncClient hc : httpClients)
- hc.close();
+ public OperationStats stats() {
+ return requestStrategy.stats();
}
- private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
- SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params));
- requestHeaders.forEach((name, value) -> request.setHeader(name, value.get()));
- if (operationJson != null)
- request.setBody(operationJson, ContentType.APPLICATION_JSON);
-
- return requestStrategy.enqueue(documentId, request, this::send)
- .handle((response, thrown) -> {
- if (thrown != null) {
- if (requestStrategy.hasFailed()) {
- try { close(); }
- catch (IOException exception) { thrown.addSuppressed(exception); }
- }
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- thrown.printStackTrace(new PrintStream(buffer));
- return new Result(Result.Type.failure, documentId, buffer.toString(), null);
- }
- return toResult(response, documentId);
- });
+ @Override
+ public CircuitBreaker.State circuitBreakerState() {
+ return requestStrategy.circuitBreakerState();
}
- /** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */
- private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
- int index = 0;
- int min = Integer.MAX_VALUE;
- for (int i = 0; i < httpClients.size(); i++)
- if (inflight.get(i).get() < min) {
- min = inflight.get(i).get();
- index = i;
- }
+ @Override
+ public void close(boolean graceful) {
+ closed.set(true);
+ if (graceful)
+ requestStrategy.await();
- inflight.get(index).incrementAndGet();
- try {
- httpClients.get(index).execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.thenRun(inflight.get(index)::decrementAndGet);
+ requestStrategy.destroy();
+ }
+
+ private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
+ HttpRequest request = new HttpRequest(method,
+ getPath(documentId) + getQuery(params),
+ requestHeaders,
+ operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way?
+
+ return requestStrategy.enqueue(documentId, request)
+ .thenApply(response -> toResult(request, response, documentId));
}
- static Result toResult(SimpleHttpResponse response, DocumentId documentId) {
+ static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
Result.Type type;
- switch (response.getCode()) {
+ switch (response.code()) {
case 200: type = Result.Type.success; break;
case 412: type = Result.Type.conditionNotMet; break;
- default: type = Result.Type.failure;
+ case 502:
+ case 504:
+ case 507: type = Result.Type.failure; break;
+ default: type = null;
}
- Map<String, String> responseJson = null; // TODO: parse JSON on error.
- return new Result(type, documentId, response.getBodyText(), "trace");
+
+ String message = null;
+ String trace = null;
+ try {
+ JsonParser parser = factory.createParser(response.body());
+ if (parser.nextToken() != JsonToken.START_OBJECT)
+ throw new ResultParseException(
+ documentId,
+ "Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: "
+ + new String(response.body(), UTF_8));
+
+ String name;
+ while ((name = parser.nextFieldName()) != null) {
+ switch (name) {
+ case "message": message = parser.nextTextValue(); break;
+ case "trace": trace = parser.nextTextValue(); break;
+ default: parser.nextToken();
+ }
+ }
+
+ if (parser.currentToken() != JsonToken.END_OBJECT)
+ throw new ResultParseException(
+ documentId,
+ "Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: "
+ + new String(response.body(), UTF_8));
+ }
+ catch (IOException e) {
+ throw new ResultParseException(documentId, e);
+ }
+
+ if (type == null) // Not a Vespa response, but a failure in the HTTP layer.
+ throw new ResultParseException(
+ documentId,
+ "Status " + response.code() + " executing '" + request + "': "
+ + (message == null ? new String(response.body(), UTF_8) : message));
+
+ return new Result(type, documentId, message, trace);
}
- static List<String> toPath(DocumentId documentId) {
- List<String> path = new ArrayList<>();
+ static String getPath(DocumentId documentId) {
+ StringJoiner path = new StringJoiner("/", "/", "");
path.add("document");
path.add("v1");
- path.add(documentId.namespace());
- path.add(documentId.documentType());
+ path.add(encode(documentId.namespace()));
+ path.add(encode(documentId.documentType()));
if (documentId.number().isPresent()) {
path.add("number");
path.add(Long.toUnsignedString(documentId.number().getAsLong()));
}
else if (documentId.group().isPresent()) {
path.add("group");
- path.add(documentId.group().get());
+ path.add(encode(documentId.group().get()));
}
else {
path.add("docid");
}
- path.add(documentId.userSpecific());
+ path.add(encode(documentId.userSpecific()));
- return path;
+ return path.toString();
}
- static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) {
- URIBuilder url = new URIBuilder(endpoint);
- url.setPathSegments(toPath(documentId));
-
- if (params.createIfNonExistent()) url.addParameter("create", "true");
- params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition));
- params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms"));
- params.route().ifPresent(route -> url.addParameter("route", route));
- params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel)));
-
+ static String encode(String raw) {
try {
- return url.build();
+ return URLEncoder.encode(raw, UTF_8.name());
}
- catch (URISyntaxException e) {
+ catch (UnsupportedEncodingException e) {
throw new IllegalStateException(e);
}
}
+ static String getQuery(OperationParameters params) {
+ StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
+ if (params.createIfNonExistent()) query.add("create=true");
+ params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
+ params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms"));
+ params.route().ifPresent(route -> query.add("route=" + encode(route)));
+ params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
+ return query.toString();
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
new file mode 100644
index 00000000000..8da2f46def2
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
@@ -0,0 +1,42 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Map;
+import java.util.function.Supplier;
+
+class HttpRequest {
+
+ private final String method;
+ private final String path;
+ private final Map<String, Supplier<String>> headers;
+ private final byte[] body;
+
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) {
+ this.method = method;
+ this.path = path;
+ this.headers = headers;
+ this.body = body;
+ }
+
+ public String method() {
+ return method;
+ }
+
+ public String path() {
+ return path;
+ }
+
+ public Map<String, Supplier<String>> headers() {
+ return headers;
+ }
+
+ public byte[] body() {
+ return body;
+ }
+
+ @Override
+ public String toString() {
+ return method + " " + path;
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index d0d67d65446..e9cd0baba5b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -1,26 +1,31 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedClient.RetryStrategy;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.HashMap;
+import java.nio.channels.CancelledKeyException;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.BiConsumer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
-import static java.lang.Math.max;
-import static java.lang.Math.min;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.WARNING;
+// TODO: update doc
/**
* Controls request execution and retries:
* <ul>
@@ -31,58 +36,94 @@ import static java.util.logging.Level.INFO;
*
* @author jonmv
*/
-class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
+class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
- private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
- private final Object monitor = new Object();
- private final Clock clock;
- private final RetryStrategy wrapped;
- private final Thread delayer = new Thread(this::drainDelayed, "feed-client-retry-delayer");
- private final BlockingQueue<CompletableFuture<Void>> delayed = new LinkedBlockingQueue<>();
- private final long maxInflight;
- private final long minInflight;
- private double targetInflight;
- private long inflight = 0;
- private long consecutiveSuccesses = 0;
- private Instant lastSuccess;
- private boolean failed = false;
- private boolean closed = false;
-
- HttpRequestStrategy(FeedClientBuilder builder, Clock clock) {
- this.wrapped = builder.retryStrategy;
- this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
- this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection);
- this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
- this.clock = clock;
- this.lastSuccess = clock.instant();
- this.delayer.start();
- }
-
- private void drainDelayed() {
- try {
- while (true) {
- do delayed.take().complete(null);
- while ( ! hasFailed());
+ private final Cluster cluster;
+ private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>();
+ private final RetryStrategy strategy;
+ private final CircuitBreaker breaker;
+ final FeedClient.Throttler throttler;
+ private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong inflight = new AtomicLong(0);
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+ private final AtomicLong delayedCount = new AtomicLong(0);
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable, "feed-client-result-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
+ this(builder, new ApacheCluster(builder));
+ }
+
+ HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster;
+ this.strategy = builder.retryStrategy;
+ this.breaker = builder.circuitBreaker;
+ this.throttler = new DynamicThrottler(builder);
+
+ Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
+ dispatcher.setDaemon(true);
+ dispatcher.start();
+ }
+
+ @Override
+ public OperationStats stats() {
+ return cluster.stats();
+ }
+
+ @Override
+ public CircuitBreaker.State circuitBreakerState() {
+ return breaker.state();
+ }
- Thread.sleep(1000);
+ private void dispatch() {
+ try {
+ while (breaker.state() != OPEN && ! destroyed.get()) {
+ while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
+ // Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open?
}
}
catch (InterruptedException e) {
- delayed.forEach(action -> action.cancel(true));
+ Thread.currentThread().interrupt();
+ log.log(WARNING, "Dispatch thread interrupted; shutting down");
}
+ destroy();
+ }
+
+ private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ delayedCount.incrementAndGet();
+ queue.offer(() -> {
+ cluster.dispatch(request, vessel);
+ });
+ }
+
+ private boolean poll() {
+ Runnable task = queue.poll();
+ if (task == null) return false;
+ delayedCount.decrementAndGet();
+ task.run();
+ return true;
}
- private boolean retry(SimpleHttpRequest request, int attempt) {
- if (attempt >= wrapped.retries())
+
+ private boolean isInExcess() {
+ return inflight.get() - delayedCount.get() > throttler.targetInflight();
+ }
+
+ private boolean retry(HttpRequest request, int attempt) {
+ if (attempt > strategy.retries())
return false;
- switch (request.getMethod().toUpperCase()) {
- case "POST": return wrapped.retry(FeedClient.OperationType.put);
- case "PUT": return wrapped.retry(FeedClient.OperationType.update);
- case "DELETE": return wrapped.retry(FeedClient.OperationType.remove);
- default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
+ switch (request.method().toUpperCase()) {
+ case "POST": return strategy.retry(FeedClient.OperationType.PUT);
+ case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE);
+ case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE);
+ default: throw new IllegalStateException("Unexpected HTTP method: " + request.method());
}
}
@@ -90,158 +131,126 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
- private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
- failure();
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error");
-
- if ( ! (thrown instanceof IOException))
- return false;
+ private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
+ breaker.failure();
+ log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
- return retry(request, attempt);
- }
-
- void success() {
- Instant now = clock.instant();
- synchronized (monitor) {
- ++consecutiveSuccesses;
- lastSuccess = now;
- targetInflight = min(targetInflight + 0.1, maxInflight);
- }
- }
+ if ( (thrown instanceof IOException) // General IO problems.
+ || (thrown instanceof CancellationException) // TLS session disconnect.
+ || (thrown instanceof CancelledKeyException)) // Selection cancelled.
+ return retry(request, attempt);
- void failure() {
- Instant threshold = clock.instant().minusSeconds(300);
- synchronized (monitor) {
- consecutiveSuccesses = 0;
- if (lastSuccess.isBefore(threshold))
- failed = true;
- }
+ return false;
}
/** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
- private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
- if (response.getCode() / 100 == 2) {
- success();
+ private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
+ if (response.code() / 100 == 2) {
+ breaker.success();
+ throttler.success();
return false;
}
- if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
- synchronized (monitor) {
- targetInflight = max(inflight * 0.9, minInflight);
- }
- log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
- " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+ log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) +
+ ") on attempt " + attempt + " at " + request);
+ if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
+ throttler.throttled((inflight.get() - delayedCount.get()));
return true;
}
- log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
- " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+ breaker.failure();
+ if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors.
+ return retry(request, attempt);
- failure();
- if (response.getCode() != 500 && response.getCode() != 502)
- return false;
-
- return retry(request, attempt); // Hopefully temporary errors.
+ return false;
}
- // Must hold lock.
private void acquireSlot() {
try {
- while (inflight >= targetInflight)
- monitor.wait();
+ while (inflight.get() >= throttler.targetInflight())
+ Thread.sleep(1);
- ++inflight;
+ inflight.incrementAndGet();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- // Must hold lock.
private void releaseSlot() {
- for (long i = --inflight; i < targetInflight; i++)
- monitor.notify();
+ inflight.decrementAndGet();
}
- @Override
- public boolean hasFailed() {
- synchronized (monitor) {
- return failed;
+ public void await() {
+ try {
+ while (inflight.get() > 0)
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
@Override
- public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
- BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) {
- CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
- CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
- CompletableFuture<Void> blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete.
-
- // Get the previous inflight operation for this doc-id, or acquire a send slot.
- CompletableFuture<Void> previous;
- synchronized (monitor) {
- previous = inflightById.put(documentId, blocker);
- if (previous == null)
- acquireSlot();
+ public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
+ CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
+ CompletableFuture<?> previous = inflightById.put(documentId, result);
+ if (destroyed.get()) {
+ result.cancel(true);
+ return result;
+ }
+
+ if (previous == null) {
+ acquireSlot();
+ offer(request, vessel);
+ throttler.sent(inflight.get(), result);
}
- if (previous == null) // Send immediately if none inflight ...
- dispatch.accept(request, vessel);
- else // ... or send when the previous inflight is done.
- previous.thenRun(() -> dispatch.accept(request, vessel));
-
- handleAttempt(vessel, dispatch, request, result, 1);
-
- result.thenRun(() -> {
- CompletableFuture<Void> current;
- synchronized (monitor) {
- current = inflightById.get(documentId);
- if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ...
- releaseSlot();
- inflightById.put(documentId, null);
- }
+ else
+ previous.whenComplete((__, ___) -> offer(request, vessel));
+
+ handleAttempt(vessel, request, result, 1);
+
+ return result.handle((response, error) -> {
+ if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null)
+ releaseSlot();
+
+ if (error != null) {
+ if (error instanceof FeedException) throw (FeedException) error;
+ throw new FeedException(documentId, error);
}
- if (current != blocker) // ... or trigger sending the next enqueued operation.
- blocker.complete(null);
+ return response;
});
-
- return result;
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch,
- SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) {
- vessel.whenComplete((response, thrown) -> {
- // Retry the operation if it failed with a transient error ...
- if (thrown != null ? retry(request, thrown, attempt)
- : retry(request, response, attempt)) {
- CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
- boolean hasFailed = hasFailed();
- if (hasFailed)
- delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry)));
- else
- dispatch.accept(request, retry);
- handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1));
- return;
- }
-
- // ... or accept the outcome and mark the operation as complete.
- if (thrown == null) result.complete(response);
- else result.completeExceptionally(thrown);
- });
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
+ vessel.whenCompleteAsync((response, thrown) -> {
+ // Retry the operation if it failed with a transient error ...
+ if (thrown != null ? retry(request, thrown, attempt)
+ : retry(request, response, attempt)) {
+ CircuitBreaker.State state = breaker.state();
+ CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
+ offer(request, retry);
+ handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
+ }
+ // ... or accept the outcome and mark the operation as complete.
+ else {
+ if (thrown == null) result.complete(response);
+ else result.completeExceptionally(thrown);
+ }
+ },
+ resultExecutor);
}
@Override
- public void close() {
- synchronized (monitor) {
- if (closed)
- return;
-
- closed = true;
+ public void destroy() {
+ if ( ! destroyed.getAndSet(true)) {
+ inflightById.values().forEach(result -> result.cancel(true));
+ cluster.close();
+ resultExecutor.shutdown();
}
- delayer.interrupt();
- try { delayer.join(); }
- catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
new file mode 100644
index 00000000000..b1dd54240eb
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
@@ -0,0 +1,16 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+interface HttpResponse {
+
+ int code();
+ byte[] body();
+
+ static HttpResponse of(int code, byte[] body) {
+ return new HttpResponse() {
+ @Override public int code() { return code; }
+ @Override public byte[] body() { return body; }
+ };
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
new file mode 100644
index 00000000000..0ba373eef18
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -0,0 +1,484 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.OperationType;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
+import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
+import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
+import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
+import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ * @author bjorncs
+ */
+public class JsonFeeder implements Closeable {
+
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "json-feeder-result-executor");
+ t.setDaemon(true);
+ return t;
+ });
+ private final FeedClient client;
+ private final OperationParameters protoParameters;
+
+ private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
+ this.client = client;
+ this.protoParameters = protoParameters;
+ }
+
+ public interface ResultCallback {
+ /**
+ * Invoked after each operation has either completed successfully or failed
+ *
+ * @param result Non-null if operation completed successfully
+ * @param error Non-null if operation failed
+ */
+ default void onNextResult(Result result, FeedException error) { }
+
+ /**
+ * Invoked if an unrecoverable error occurred during feed processing,
+ * after which no other {@link ResultCallback} methods are invoked.
+ */
+ default void onError(FeedException error) { }
+
+ /**
+ * Invoked when all feed operations are either completed successfully or failed.
+ */
+ default void onComplete() { }
+ }
+
+ public static Builder builder(FeedClient client) { return new Builder(client); }
+
+ /** Feeds single JSON feed operations on the form
+ * <pre>
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * }
+ * </pre>
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Result> feedSingle(String json) {
+ CompletableFuture<Result> result = new CompletableFuture<>();
+ try {
+ SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
+ parser.next().whenCompleteAsync((operationResult, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ } else {
+ result.complete(operationResult);
+ }
+ }, resultExecutor);
+ } catch (Exception e) {
+ resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
+ }
+ return result;
+ }
+
+ /** Feeds a stream containing a JSON array of feed operations on the form
+ * <pre>
+ * [
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "put": "id:ns:type::foo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "update": "id:ns:type:n=4:bar",
+ * "create": true,
+ * "fields": { ... partial update fields ... }
+ * },
+ * {
+ * "remove": "id:ns:type:g=foo:bar",
+ * "condition": "type.baz = \"bax\""
+ * },
+ * ...
+ * ]
+ * </pre>
+ * Note that {@code "id"} is an alias for the document put operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
+ return feedMany(jsonStream, 1 << 26, resultCallback);
+ }
+
+ /**
+ * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance.
+ * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream) {
+ return feedMany(jsonStream, new ResultCallback() { });
+ }
+
+ CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
+ CompletableFuture<Void> overallResult = new CompletableFuture<>();
+ CompletableFuture<Result> result;
+ AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
+ try {
+ RingBufferStream buffer = new RingBufferStream(jsonStream, size);
+ while ((result = buffer.next()) != null) {
+ pending.incrementAndGet();
+ result.whenCompleteAsync((r, t) -> {
+ if (!finalCallbackInvoked.get()) {
+ resultCallback.onNextResult(r, (FeedException) t);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ }
+ }, resultExecutor);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ } catch (Exception e) {
+ if (finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ FeedException wrapped = wrapException(e);
+ resultCallback.onError(wrapped);
+ overallResult.completeExceptionally(wrapped);
+ });
+ }
+ }
+ return overallResult;
+ }
+
+ private static final JsonFactory factory = new JsonFactory();
+
+ @Override public void close() throws IOException {
+ client.close();
+ resultExecutor.shutdown();
+ try {
+ if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ throw new IOException("Failed to close client in time");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private FeedException wrapException(Exception e) {
+ if (e instanceof FeedException) return (FeedException) e;
+ if (e instanceof IOException) {
+ return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e);
+ }
+ return new FeedException(e);
+ }
+
+ private class RingBufferStream extends InputStream {
+
+ private final byte[] b = new byte[1];
+ private final InputStream in;
+ private final byte[] data;
+ private final int size;
+ private final Object lock = new Object();
+ private IOException thrown = null;
+ private long tail = 0;
+ private long pos = 0;
+ private long head = 0;
+ private boolean done = false;
+ private final OperationParserAndExecutor parserAndExecutor;
+
+ RingBufferStream(InputStream in, int size) throws IOException {
+ this.in = in;
+ this.data = new byte[size];
+ this.size = size;
+
+ new Thread(this::fill, "feed-reader").start();
+
+ this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
+ }
+
+ @Override
+ public int read() throws IOException {
+ return read(b, 0, 1) == -1 ? -1 : b[0];
+ }
+
+ @Override
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ try {
+ int ready;
+ synchronized (lock) {
+ while ((ready = (int) (head - pos)) == 0 && ! done)
+ lock.wait();
+ }
+ if (thrown != null) throw thrown;
+ if (ready == 0) return -1;
+
+ ready = min(ready, len);
+ int offset = (int) (pos % size);
+ int length = min(ready, size - offset);
+ System.arraycopy(data, offset, buffer, off, length);
+ if (length < ready)
+ System.arraycopy(data, 0, buffer, off + length, ready - length);
+
+ pos += ready;
+ return ready;
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
+ }
+ }
+
+ public CompletableFuture<Result> next() throws IOException {
+ return parserAndExecutor.next();
+ }
+
+ private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
+ private byte[] copy(long start, long end) {
+ int length = (int) (end - start);
+ byte[] buffer = new byte[prefix.length + length + 1];
+ System.arraycopy(prefix, 0, buffer, 0, prefix.length);
+
+ int offset = (int) (start % size);
+ int toWrite = min(length, size - offset);
+ System.arraycopy(data, offset, buffer, prefix.length, toWrite);
+ if (toWrite < length)
+ System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
+
+ buffer[buffer.length - 1] = '}';
+ return buffer;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
+ in.close();
+ }
+
+ private void fill() {
+ try {
+ while (true) {
+ int free;
+ synchronized (lock) {
+ while ((free = (int) (tail + size - head)) <= 0 && !done)
+ lock.wait();
+ }
+ if (done) break;
+
+ int off = (int) (head % size);
+ int len = min(min(free, size - off), 1 << 13);
+ int read = in.read(data, off, len);
+
+ synchronized (lock) {
+ if (read < 0) done = true;
+ else head += read;
+ lock.notify();
+ }
+ }
+ } catch (InterruptedException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage());
+ }
+ } catch (IOException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = e;
+ }
+ }
+ }
+
+ private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ String payload = new String(copy(start, end), UTF_8);
+ synchronized (lock) {
+ tail = end;
+ lock.notify();
+ }
+ return payload;
+ }
+ }
+ }
+
+ private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ private final byte[] json;
+
+ SingleOperationParserAndExecutor(byte[] json) throws IOException {
+ super(factory.createParser(json), false);
+ this.json = json;
+ }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ return new String(json, (int) start, (int) (end - start), UTF_8);
+ }
+ }
+
+ private abstract class OperationParserAndExecutor {
+
+ private final JsonParser parser;
+ private final boolean multipleOperations;
+ private boolean arrayPrefixParsed;
+
+ protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) {
+ this.parser = parser;
+ this.multipleOperations = multipleOperations;
+ }
+
+ abstract String getDocumentJson(long start, long end);
+
+ CompletableFuture<Result> next() throws IOException {
+ if (multipleOperations && !arrayPrefixParsed){
+ expect(START_ARRAY);
+ arrayPrefixParsed = true;
+ }
+
+ JsonToken token = parser.nextToken();
+ if (token == END_ARRAY && multipleOperations) return null;
+ else if (token == null && !multipleOperations) return null;
+ else if (token == START_OBJECT);
+ else throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
+ }
+ default: throw new OperationParseException("Unexpected field name '" + parser.getText() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
+ break;
+
+ case END_OBJECT:
+ break loop;
+
+ default:
+ throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
+ }
+ if (id == null)
+ throw new OperationParseException("No document id for document at offset " + start);
+
+ if (end < start)
+ throw new OperationParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
+ String payload = getDocumentJson(start, end);
+ switch (type) {
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
+ default: throw new OperationParseException("Unexpected operation type '" + type + "'");
+ }
+ }
+
+ private void expect(JsonToken token) throws IOException {
+ if (parser.nextToken() != token)
+ throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ }
+
+ private String readString() throws IOException {
+ String value = parser.nextTextValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+ }
+
+ private boolean readBoolean() throws IOException {
+ Boolean value = parser.nextBooleanValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+
+ }
+
+ private DocumentId readId() throws IOException {
+ return DocumentId.of(readString());
+ }
+
+ }
+
+ public static class Builder {
+
+ final FeedClient client;
+ OperationParameters parameters = OperationParameters.empty();
+
+ private Builder(FeedClient client) {
+ this.client = requireNonNull(client);
+ }
+
+ public Builder withTimeout(Duration timeout) {
+ parameters = parameters.timeout(timeout);
+ return this;
+ }
+
+ public Builder withRoute(String route) {
+ parameters = parameters.route(route);
+ return this;
+ }
+
+ public Builder withTracelevel(int tracelevel) {
+ parameters = parameters.tracelevel(tracelevel);
+ return this;
+ }
+
+ public JsonFeeder build() {
+ return new JsonFeeder(client, parameters);
+ }
+
+ }
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
deleted file mode 100644
index 17162f19d3f..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ /dev/null
@@ -1,364 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import ai.vespa.feed.client.FeedClient.OperationType;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.io.UncheckedIOException;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static ai.vespa.feed.client.FeedClient.OperationType.put;
-import static ai.vespa.feed.client.FeedClient.OperationType.remove;
-import static ai.vespa.feed.client.FeedClient.OperationType.update;
-import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
-import static java.lang.Math.min;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-
-/**
- * @author jonmv
- */
-public class JsonStreamFeeder implements Closeable {
-
- private final FeedClient client;
- private final OperationParameters protoParameters;
-
- private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) {
- this.client = client;
- this.protoParameters = protoParameters;
- }
-
- public static Builder builder(FeedClient client) { return new Builder(client); }
-
- /** Feeds a stream containing a JSON array of feed operations on the form
- * <pre>
- * [
- * {
- * "id": "id:ns:type::boo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "put": "id:ns:type::foo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "update": "id:ns:type:n=4:bar",
- * "create": true,
- * "fields": { ... partial update fields ... }
- * },
- * {
- * "remove": "id:ns:type:g=foo:bar",
- * "condition": "type.baz = \"bax\""
- * },
- * ...
- * ]
- * </pre>
- * Note that {@code "id"} is an alias for the document put operation.
- */
- public void feed(InputStream jsonStream) throws IOException {
- feed(jsonStream, 1 << 26, false);
- }
-
- BenchmarkResult benchmark(InputStream jsonStream) throws IOException {
- return feed(jsonStream, 1 << 26, true).get();
- }
-
- Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException {
- RingBufferStream buffer = new RingBufferStream(jsonStream, size);
- buffer.expect(JsonToken.START_ARRAY);
- AtomicInteger okCount = new AtomicInteger();
- AtomicInteger failedCount = new AtomicInteger();
- long startTime = System.nanoTime();
- CompletableFuture<Result> result;
- AtomicReference<Throwable> thrown = new AtomicReference<>();
- while ((result = buffer.next()) != null) {
- result.whenComplete((r, t) -> {
- if (t != null) {
- failedCount.incrementAndGet();
- if (!benchmark) thrown.set(t);
- } else
- okCount.incrementAndGet();
- });
- if (thrown.get() != null)
- sneakyThrow(thrown.get());
- }
- if (!benchmark) return Optional.empty();
- Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
- double throughPut = (double)okCount.get() / duration.toMillis() * 1000D;
- return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut));
- }
-
- @SuppressWarnings("unchecked")
- static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; }
-
- private static final JsonFactory factory = new JsonFactory();
-
- @Override public void close() throws IOException { client.close(); }
-
- private class RingBufferStream extends InputStream {
-
- private final byte[] b = new byte[1];
- private final InputStream in;
- private final byte[] data;
- private final int size;
- private final Object lock = new Object();
- private final JsonParser parser;
- private Throwable thrown = null;
- private long tail = 0;
- private long pos = 0;
- private long head = 0;
- private boolean done = false;
-
- RingBufferStream(InputStream in, int size) {
- this.in = in;
- this.data = new byte[size];
- this.size = size;
-
- new Thread(this::fill, "feed-reader").start();
-
- try { this.parser = factory.createParser(this); }
- catch (IOException e) { throw new UncheckedIOException(e); }
- }
-
- @Override
- public int read() throws IOException {
- return read(b, 0, 1) == -1 ? -1 : b[0];
- }
-
- @Override
- public int read(byte[] buffer, int off, int len) throws IOException {
- try {
- int ready;
- synchronized (lock) {
- while ((ready = (int) (head - pos)) == 0 && ! done)
- lock.wait();
- }
- if (thrown != null) throw new RuntimeException("Error reading input", thrown);
- if (ready == 0) return -1;
-
- ready = min(ready, len);
- int offset = (int) (pos % size);
- int length = min(ready, size - offset);
- System.arraycopy(data, offset, buffer, off, length);
- if (length < ready)
- System.arraycopy(data, 0, buffer, off + length, ready - length);
-
- pos += ready;
- return ready;
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
- }
- }
-
- void expect(JsonToken token) throws IOException {
- if (parser.nextToken() != token)
- throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
- }
-
- public CompletableFuture<Result> next() throws IOException {
- long start = 0, end = -1;
- OperationType type = null;
- DocumentId id = null;
- OperationParameters parameters = protoParameters;
- switch (parser.nextToken()) {
- case END_ARRAY: return null;
- case START_OBJECT: break;
- default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
-
- loop: while (true) {
- switch (parser.nextToken()) {
- case FIELD_NAME:
- switch (parser.getText()) {
- case "id":
- case "put": type = put; id = readId(); break;
- case "update": type = update; id = readId(); break;
- case "remove": type = remove; id = readId(); break;
- case "condition": parameters = parameters.testAndSetCondition(readString()); break;
- case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
- case "fields": {
- expect(START_OBJECT);
- start = parser.getTokenLocation().getByteOffset();
- int depth = 1;
- while (depth > 0) switch (parser.nextToken()) {
- case START_OBJECT: ++depth; break;
- case END_OBJECT: --depth; break;
- }
- end = parser.getTokenLocation().getByteOffset() + 1;
- break;
- }
- default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
- break;
-
- case END_OBJECT:
- break loop;
-
- default:
- throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
- }
-
- if (id == null)
- throw new IllegalArgumentException("No document id for document at offset " + start);
-
- if (end < start)
- throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
-
- String payload = new String(copy(start, end), UTF_8);
- synchronized (lock) {
- tail = end;
- lock.notify();
- }
-
- switch (type) {
- case put: return client.put (id, payload, parameters);
- case update: return client.update(id, payload, parameters);
- case remove: return client.remove(id, parameters);
- default: throw new IllegalStateException("Unexpected operation type '" + type + "'");
- }
- }
-
- private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
- private byte[] copy(long start, long end) {
- int length = (int) (end - start);
- byte[] buffer = new byte[prefix.length + length + 1];
- System.arraycopy(prefix, 0, buffer, 0, prefix.length);
-
- int offset = (int) (start % size);
- int toWrite = min(length, size - offset);
- System.arraycopy(data, offset, buffer, prefix.length, toWrite);
- if (toWrite < length)
- System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
-
- buffer[buffer.length - 1] = '}';
- return buffer;
- }
-
- private String readString() throws IOException {
- String value = parser.nextTextValue();
- if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
- }
-
- private boolean readBoolean() throws IOException {
- Boolean value = parser.nextBooleanValue();
- if (value == null)
- throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
-
- }
-
- private DocumentId readId() throws IOException {
- return DocumentId.of(readString());
- }
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- done = true;
- lock.notifyAll();
- }
- in.close();
- }
-
- private void fill() {
- try {
- while (true) {
- int free;
- synchronized (lock) {
- while ((free = (int) (tail + size - head)) <= 0 && ! done)
- lock.wait();
- }
- if (done) break;
-
- int off = (int) (head % size);
- int len = min(min(free, size - off), 1 << 13);
- int read = in.read(data, off, len);
-
- synchronized (lock) {
- if (read < 0) done = true;
- else head += read;
- lock.notify();
- }
- }
- }
- catch (Throwable t) {
- synchronized (lock) {
- done = true;
- thrown = t;
- }
- }
- }
-
- }
-
-
- public static class Builder {
-
- final FeedClient client;
- OperationParameters parameters = OperationParameters.empty();
-
- private Builder(FeedClient client) {
- this.client = requireNonNull(client);
- }
-
- public Builder withTimeout(Duration timeout) {
- parameters = parameters.timeout(timeout);
- return this;
- }
-
- public Builder withRoute(String route) {
- parameters = parameters.route(route);
- return this;
- }
-
- public Builder withTracelevel(int tracelevel) {
- parameters = parameters.tracelevel(tracelevel);
- return this;
- }
-
- public JsonStreamFeeder build() {
- return new JsonStreamFeeder(client, parameters);
- }
-
- }
-
- static class BenchmarkResult {
- final int okCount;
- final int errorCount;
- final Duration duration;
- final double throughput;
-
- BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) {
- this.okCount = okCount;
- this.errorCount = errorCount;
- this.duration = duration;
- this.throughput = throughput;
- }
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
index 22546f89ccb..8c20a37d224 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
@@ -7,6 +7,8 @@ import java.util.Optional;
import java.util.OptionalInt;
/**
+ * Per-operation feed parameters
+ *
* @author bjorncs
* @author jonmv
*/
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java
new file mode 100644
index 00000000000..15ba024bb4e
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java
@@ -0,0 +1,15 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+/**
+ * Signals that supplied JSON for a document/operation is invalid
+ *
+ * @author bjorncs
+ */
+public class OperationParseException extends FeedException {
+
+ public OperationParseException(String message) { super(message); }
+
+ public OperationParseException(String message, Throwable cause) { super(message, cause); }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
new file mode 100644
index 00000000000..d36475a51fb
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
@@ -0,0 +1,96 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Map;
+
+/**
+ * Statistics for feed operations over HTTP against a Vespa cluster.
+ *
+ * @author jonmv
+ */
+public class OperationStats {
+
+ private final long requests;
+ private final Map<Integer, Long> responsesByCode;
+ private final long inflight;
+ private final long exceptions;
+ private final long averageLatencyMillis;
+ private final long minLatencyMillis;
+ private final long maxLatencyMillis;
+ private final long bytesSent;
+ private final long bytesReceived;
+
+ public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight,
+ long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis,
+ long bytesSent, long bytesReceived) {
+ this.requests = requests;
+ this.responsesByCode = responsesByCode;
+ this.exceptions = exceptions;
+ this.inflight = inflight;
+ this.averageLatencyMillis = averageLatencyMillis;
+ this.minLatencyMillis = minLatencyMillis;
+ this.maxLatencyMillis = maxLatencyMillis;
+ this.bytesSent = bytesSent;
+ this.bytesReceived = bytesReceived;
+ }
+
+ public long requests() {
+ return requests;
+ }
+
+ public long responses() {
+ return requests - inflight;
+ }
+
+ public long successes() {
+ return responsesByCode.getOrDefault(200, 0L);
+ }
+
+ public Map<Integer, Long> responsesByCode() {
+ return responsesByCode;
+ }
+
+ public long exceptions() {
+ return exceptions;
+ }
+
+ public long inflight() {
+ return inflight;
+ }
+
+ public long averageLatencyMillis() {
+ return averageLatencyMillis;
+ }
+
+ public long minLatencyMillis() {
+ return minLatencyMillis;
+ }
+
+ public long maxLatencyMillis() {
+ return maxLatencyMillis;
+ }
+
+ public long bytesSent() {
+ return bytesSent;
+ }
+
+ public long bytesReceived() {
+ return bytesReceived;
+ }
+
+ @Override
+ public String toString() {
+ return "Stats{" +
+ "requests=" + requests +
+ ", responsesByCode=" + responsesByCode +
+ ", exceptions=" + exceptions +
+ ", inflight=" + inflight +
+ ", averageLatencyMillis=" + averageLatencyMillis +
+ ", minLatencyMillis=" + minLatencyMillis +
+ ", maxLatencyMillis=" + maxLatencyMillis +
+ ", bytesSent=" + bytesSent +
+ ", bytesReceived=" + bytesReceived +
+ '}';
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index 1787d8d65c6..a1101eb0ebb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,24 +1,30 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
-import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker.State;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiConsumer;
/**
* Controls execution of feed operations.
*
* @author jonmv
*/
-public interface RequestStrategy {
+interface RequestStrategy {
- /** Whether this has failed fatally, and we should cease sending further operations. */
- boolean hasFailed();
+ /** Stats for operations sent through this. */
+ OperationStats stats();
+
+ /** State of the circuit breaker. */
+ State circuitBreakerState();
+
+ /** Forcibly terminates this, causing all inflight operations to complete immediately. */
+ void destroy();
+
+ /** Wait for all inflight requests to complete. */
+ void await();
/** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
- CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
- BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch);
+ CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request);
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
index 31a6cf6e893..b29d65e193b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
@@ -4,6 +4,8 @@ package ai.vespa.feed.client;
import java.util.Optional;
/**
+ * Result for a document operation
+ *
* @author bjorncs
* @author jonmv
*/
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java
new file mode 100644
index 00000000000..3fd5143e2f4
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java
@@ -0,0 +1,14 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+/**
+ * Signals that the client was unable to parse the result/response from container
+ *
+ * @author bjorncs
+ */
+public class ResultParseException extends FeedException {
+
+ public ResultParseException(DocumentId documentId, String message) { super(documentId, message); }
+
+ public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); }
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
index 7200d5fd943..9114e22f4a6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
@@ -20,11 +20,14 @@ import java.nio.file.Path;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.KeyStore;
+import java.security.KeyStoreException;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
/**
@@ -39,6 +42,9 @@ class SslContextBuilder {
private Path certificateFile;
private Path privateKeyFile;
private Path caCertificatesFile;
+ private Collection<X509Certificate> certificate;
+ private PrivateKey privateKey;
+ private Collection<X509Certificate> caCertificates;
SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) {
this.certificateFile = certificate;
@@ -46,20 +52,35 @@ class SslContextBuilder {
return this;
}
+ SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ this.certificate = certificate;
+ this.privateKey = privateKey;
+ return this;
+ }
+
SslContextBuilder withCaCertificates(Path caCertificates) {
this.caCertificatesFile = caCertificates;
return this;
}
+ SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) {
+ this.caCertificates = caCertificates;
+ return this;
+ }
+
SSLContext build() throws IOException {
try {
KeyStore keystore = KeyStore.getInstance("PKCS12");
keystore.load(null);
if (certificateFile != null && privateKeyFile != null) {
keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile));
+ } else if (certificate != null && privateKey != null) {
+ keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0]));
}
if (caCertificatesFile != null) {
- keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]);
+ addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile)));
+ } else if (caCertificates != null) {
+ addCaCertificates(keystore, caCertificates);
}
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keystore, new char[0]);
@@ -73,6 +94,13 @@ class SslContextBuilder {
}
}
+ private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException {
+ int i = 0;
+ for (Certificate cert : certificates) {
+ keystore.setCertificateEntry("ca-cert-" + ++i, cert);
+ }
+ }
+
private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException {
try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) {
List<X509Certificate> result = new ArrayList<>();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
new file mode 100644
index 00000000000..4e0c4fe90f0
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+/**
+ * Reduces max throughput whenever throttled; increases it slowly whenever successful responses are obtained.
+ *
+ * @author jonmv
+ */
+public class StaticThrottler implements FeedClient.Throttler {
+
+ protected final long maxInflight;
+ protected final long minInflight;
+ private final AtomicLong targetX10;
+
+ public StaticThrottler(FeedClientBuilder builder) {
+ this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
+ this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection);
+ this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
+ }
+
+ @Override
+ public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { }
+
+ @Override
+ public void success() {
+ targetX10.incrementAndGet();
+ }
+
+ @Override
+ public void throttled(long inflight) {
+ targetX10.set(max(inflight * 5, minInflight * 10));
+ }
+
+ @Override
+ public long targetInflight() {
+ return min(maxInflight, targetX10.get() / 10);
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java
new file mode 100644
index 00000000000..e058b9b921e
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java
@@ -0,0 +1,9 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author bjorncs
+ */
+
+@PublicApi
+package ai.vespa.feed.client;
+
+import com.yahoo.api.annotations.PublicApi; \ No newline at end of file
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
new file mode 100644
index 00000000000..9b30ebfd0aa
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
@@ -0,0 +1,60 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class GracePeriodCircuitBreakerTest {
+
+ @Test
+ void testCircuitBreaker() {
+ AtomicLong now = new AtomicLong(0);
+ long SECOND = 1000;
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
+
+ assertEquals(CLOSED, breaker.state(), "Initial state is closed");
+
+ now.addAndGet(100 * SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity");
+
+ breaker.success();
+ assertEquals(CLOSED, breaker.state(), "State is closed after a success");
+
+ now.addAndGet(100 * SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed some time after a success");
+
+ breaker.failure();
+ assertEquals(CLOSED, breaker.state(), "State is closed right after a failure");
+
+ now.addAndGet(SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed");
+
+ now.addAndGet(1);
+ assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed");
+
+ breaker.success();
+ assertEquals(CLOSED, breaker.state(), "State is closed after a new success");
+
+ breaker.failure();
+ now.addAndGet(60 * SECOND);
+ assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed");
+
+ now.addAndGet(1);
+ assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");
+
+ breaker.success();
+ assertEquals(OPEN, breaker.state(), "State remains open in spite of new successes");
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
new file mode 100644
index 00000000000..d8090549420
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -0,0 +1,101 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * @author jonmv
+ */
+class HttpFeedClientTest {
+
+ @Test
+ void testFeeding() throws ExecutionException, InterruptedException {
+ DocumentId id = DocumentId.of("ns", "type", "0");
+ AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+ class MockRequestStrategy implements RequestStrategy {
+ @Override public OperationStats stats() { throw new UnsupportedOperationException(); }
+ @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; }
+ @Override public void destroy() { throw new UnsupportedOperationException(); }
+ @Override public void await() { throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
+ }
+ FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy());
+
+ // Vespa error is an error result.
+ dispatch.set((documentId, request) -> {
+ try {
+ assertEquals(id, documentId);
+ assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
+
+ HttpResponse response = HttpResponse.of(502,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Ooops! ... I did it again.\",\n" +
+ " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
+ "}").getBytes(UTF_8));
+ return CompletableFuture.completedFuture(response);
+ }
+ catch (Throwable thrown) {
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
+ failed.completeExceptionally(thrown);
+ return failed;
+ }
+ });
+ Result result = client.put(id,
+ "json",
+ OperationParameters.empty()
+ .createIfNonExistent(true)
+ .testAndSetCondition("false")
+ .route("route")
+ .timeout(Duration.ofSeconds(5)))
+ .get();
+ assertEquals("Ooops! ... I did it again.", result.resultMessage().get());
+ assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get());
+
+
+ // Handler error is a FeedException.
+ dispatch.set((documentId, request) -> {
+ try {
+ assertEquals(id, documentId);
+ assertEquals("/document/v1/ns/type/docid/0",
+ request.path());
+ assertEquals("json", new String(request.body(), UTF_8));
+
+ HttpResponse response = HttpResponse.of(500,
+ ("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Alla ska i jorden.\",\n" +
+ " \"trace\": \"Din tid den kom, och senn så for den. \"\n" +
+ "}").getBytes(UTF_8));
+ return CompletableFuture.completedFuture(response);
+ }
+ catch (Throwable thrown) {
+ CompletableFuture<HttpResponse> failed = new CompletableFuture<>();
+ failed.completeExceptionally(thrown);
+ return failed;
+ }
+ });
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> client.put(id,
+ "json",
+ OperationParameters.empty())
+ .get());
+ assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
new file mode 100644
index 00000000000..21ab6889e6e
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -0,0 +1,203 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
+import org.apache.hc.core5.http.ContentType;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class HttpRequestStrategyTest {
+
+ @Test
+ void testConcurrency() {
+ int documents = 1 << 16;
+ HttpRequest request = new HttpRequest("PUT", "/", null, null);
+ HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS));
+
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
+ cluster);
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ while ( ! latch.await(1, TimeUnit.SECONDS)) {
+ System.err.println(cluster.stats().inflight());
+ System.err.println(strategy.throttler.targetInflight());
+ System.err.println();
+ }
+ }
+ catch (InterruptedException ignored) { }
+ }).start();
+ long startNanos = System.nanoTime();
+ for (int i = 0; i < documents; i++)
+ strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request);
+
+ strategy.await();
+ latch.countDown();
+ executor.shutdown();
+ cluster.close();
+ OperationStats stats = cluster.stats();
+ long successes = stats.responsesByCode().get(200);
+ System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ System.err.println(stats);
+
+ assertEquals(documents, stats.requests());
+ assertEquals(documents, stats.responses());
+ assertEquals(documents, stats.responsesByCode().get(200));
+ assertEquals(0, stats.inflight());
+ assertEquals(0, stats.exceptions());
+ assertEquals(0, stats.bytesSent());
+ assertEquals(2 * documents, stats.bytesReceived());
+ }
+
+ @Test
+ void testLogic() throws ExecutionException, InterruptedException {
+ int minStreams = 16; // Hard limit for minimum number of streams per connection.
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
+ new BenchmarkingCluster(cluster));
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
+
+ // Runtime exception is not retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom")));
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("boom", expected.getCause().getMessage());
+ assertEquals(1, strategy.stats().requests());
+
+ // IOException is retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
+ expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("retry me", expected.getCause().getCause().getMessage());
+ assertEquals(3, strategy.stats().requests());
+
+ // Successful response is returned
+ HttpResponse success = HttpResponse.of(200, null);
+ cluster.expect((__, vessel) -> vessel.complete(success));
+ assertEquals(success, strategy.enqueue(id1, request).get());
+ assertEquals(4, strategy.stats().requests());
+
+ // Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
+ now.set(2000);
+ HttpResponse throttled = HttpResponse.of(429, null);
+ AtomicInteger count = new AtomicInteger(3);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
+ cluster.expect((req, vessel) -> {
+ if (req == request) {
+ if (count.decrementAndGet() > 0)
+ vessel.complete(throttled);
+ else {
+ completion.set(vessel);
+ latch.countDown();
+ }
+ }
+ else vessel.complete(success);
+ });
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get());
+ latch.await();
+ assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
+ now.set(4000);
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+ completion.get().complete(success);
+ assertEquals(success, delayed.get());
+ assertEquals(success, serialised.get());
+
+ // Some error responses are retried.
+ HttpResponse serverError = HttpResponse.of(500, null);
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, request).get());
+ assertEquals(11, strategy.stats().requests());
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+
+ // Error responses are not retried when not of appropriate type.
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get());
+ assertEquals(12, strategy.stats().requests());
+
+ // Some error responses are not retried.
+ HttpResponse badRequest = HttpResponse.of(400, null);
+ cluster.expect((__, vessel) -> vessel.complete(badRequest));
+ assertEquals(badRequest, strategy.enqueue(id1, request).get());
+ assertEquals(13, strategy.stats().requests());
+
+ // Circuit breaker opens some time after starting to fail.
+ now.set(6000);
+ assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
+ now.set(605000);
+ assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
+
+ Map<Integer, Long> codes = new HashMap<>();
+ codes.put(200, 4L);
+ codes.put(400, 1L);
+ codes.put(429, 2L);
+ codes.put(500, 3L);
+ assertEquals(codes, strategy.stats().responsesByCode());
+ assertEquals(3, strategy.stats().exceptions());
+ }
+
+ static class MockCluster implements Cluster {
+
+ final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+
+ void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
+ dispatch.set(expected);
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ dispatch.get().accept(request, vessel);
+ }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public OperationStats stats() {
+ return null;
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
new file mode 100644
index 00000000000..3e0f886a40a
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -0,0 +1,124 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class JsonFeederTest {
+
+ @Test
+ void test() throws IOException {
+ int docs = 1 << 14;
+ String json = "[\n" +
+
+ IntStream.range(0, docs).mapToObj(i ->
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + i + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " },\n"
+ ).collect(joining()) +
+
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + docs + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n" +
+ "]";
+ AtomicReference<FeedException> exceptionThrow = new AtomicReference<>();
+ Path tmpFile = Files.createTempFile(null, null);
+ Files.write(tmpFile, json.getBytes(UTF_8));
+ try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) {
+ AtomicInteger resultsReceived = new AtomicInteger();
+ AtomicBoolean completedSuccessfully = new AtomicBoolean();
+ long startNanos = System.nanoTime();
+ SimpleClient feedClient = new SimpleClient();
+ JsonFeeder.builder(feedClient).build()
+ .feedMany(in, 1 << 7,
+ new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document
+ @Override
+ public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); }
+
+ @Override
+ public void onError(FeedException error) { exceptionThrow.set(error); }
+
+ @Override
+ public void onComplete() { completedSuccessfully.set(true); }
+ })
+ .join();
+
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ assertEquals(docs + 1, feedClient.ids.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
+ }
+ }
+
+ @Test
+ public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) {
+ String json = "{\"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n";
+ Result result = feeder.feedSingle(json).get();
+ assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId());
+ assertEquals(Result.Type.success, result.type());
+ assertEquals("success", result.resultMessage().get());
+ }
+ }
+
+ private static class SimpleClient implements FeedClient {
+ final Set<String> ids = new HashSet<>();
+
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ ids.add(documentId.userSpecific());
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public OperationStats stats() { return null; }
+
+ @Override
+ public void close(boolean graceful) { }
+
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
+ }
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
deleted file mode 100644
index 8ef8ae57f5e..00000000000
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import org.junit.jupiter.api.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class JsonStreamFeederTest {
-
- @Test
- void test() throws IOException {
- int docs = 1 << 10;
- String json = "[\n" +
-
- IntStream.range(0, docs).mapToObj(i ->
- " {\n" +
- " \"id\": \"id:ns:type::abc" + i + "\",\n" +
- " \"fields\": {\n" +
- " \"lul\":\"lal\"\n" +
- " }\n" +
- " },\n"
- ).collect(Collectors.joining()) +
-
- " {\n" +
- " \"id\": \"id:ns:type::abc" + docs + "\",\n" +
- " \"fields\": {\n" +
- " \"lul\":\"lal\"\n" +
- " }\n" +
- " }\n" +
- "]";
- ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8));
- Set<String> ids = new ConcurrentSkipListSet<>();
- JsonStreamFeeder.builder(new FeedClient() {
- @Override
- public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
- ids.add(documentId.userSpecific());
- return new CompletableFuture<>();
- }
-
- @Override
- public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
- return new CompletableFuture<>();
- }
-
- @Override
- public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
- return new CompletableFuture<>();
- }
-
- @Override
- public void close() throws IOException {
-
- }
- }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6.
- assertEquals(docs + 1, ids.size());
- }
-
-}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
new file mode 100644
index 00000000000..1e616f2625a
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
@@ -0,0 +1,92 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.Result;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+/**
+ * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
+ */
+class JsonFileFeederExample implements Closeable {
+
+ private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName());
+
+ private final JsonFeeder jsonFeeder;
+ private final URI endpoint;
+
+ static class ResultCallBack implements JsonFeeder.ResultCallback {
+
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ final AtomicInteger errorsReceived = new AtomicInteger(0);
+ final long startTimeMillis = System.currentTimeMillis();;
+
+ @Override
+ public void onNextResult(Result result, FeedException error) {
+ resultsReceived.incrementAndGet();
+ if (error != null) {
+ log.warning("Problems with feeding document "
+ + error.documentId().map(DocumentId::toString).orElse("<unknown>"));
+ errorsReceived.incrementAndGet();
+ } else if (result.type() == Result.Type.failure) {
+ log.warning("Problems with docID " + result.documentId() + ":" + error);
+ errorsReceived.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onError(FeedException error) {
+ log.severe("Feeding failed for d: " + error.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ log.info("Feeding completed");
+ }
+
+ void dumpStatsToLog() {
+ log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
+ log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
+ }
+
+ }
+
+ JsonFileFeederExample(URI endpoint) {
+ this.endpoint = endpoint;
+ FeedClient feedClient = FeedClientBuilder.create(endpoint)
+ .build();
+ this.jsonFeeder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofSeconds(30))
+ .build();
+ }
+
+ /**
+ * Feed all operations from a stream.
+ *
+ * @param stream The input stream to read operations from (JSON array containing one or more document operations).
+ */
+ void batchFeed(InputStream stream, String batchId) {
+ ResultCallBack callback = new ResultCallBack();
+ log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
+ CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback);
+ promise.join(); // wait for feeding to complete
+ callback.dumpStatsToLog();
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonFeeder.close();
+ }
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
new file mode 100644
index 00000000000..5cee776b244
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -0,0 +1,117 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
+ * Other threads communicate with the feeder by adding new operations on the BlockingQueue
+ */
+
+class JsonStreamFeederExample extends Thread implements AutoCloseable {
+
+ static class Operation {
+ final String type;
+ final String documentId;
+ final String documentFieldsJson;
+
+ Operation(String type, String id, String fields) {
+ this.type = type;
+ this.documentId = id;
+ this.documentFieldsJson = fields;
+ }
+ }
+
+ private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName());
+
+ private final BlockingQueue<Operation> operations;
+ private final FeedClient feedClient;
+ private final AtomicBoolean drain = new AtomicBoolean(false);
+ private final CountDownLatch finishedDraining = new CountDownLatch(1);
+ private final AtomicInteger resultCounter = new AtomicInteger();
+
+ /**
+ * Constructor
+ * @param operations The shared blocking queue where other threads can put document operations to.
+ * @param endpoint The endpoint to feed to
+ */
+ JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) {
+ this.operations = operations;
+ this.feedClient = FeedClientBuilder.create(endpoint).build();
+ }
+
+ /**
+ * Shutdown this feeder, waits until operations on queue is drained
+ */
+ @Override
+ public void close() {
+ log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
+ drain.set(true);
+ try {
+ finishedDraining.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!drain.get() || !operations.isEmpty()) {
+ try {
+ JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS);
+ if(op == null) // no operations available
+ continue;
+ log.info("Put document " + op.documentId);
+ CompletableFuture<Result> promise;
+ DocumentId docId = DocumentId.of(op.documentId);
+ OperationParameters params = OperationParameters.empty();
+ String json = op.documentFieldsJson;
+ switch (op.type) {
+ case "put":
+ promise = feedClient.put(docId, json, params);
+ break;
+ case "remove":
+ promise = feedClient.remove(docId, params);
+ break;
+ case "update":
+ promise = feedClient.update(docId, json, params);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation: " + op.type);
+ }
+ promise.whenComplete((result, throwable) -> {
+ if (resultCounter.getAndIncrement() % 10 == 0) {
+ System.err.println(feedClient.stats());
+ }
+ if (throwable != null) {
+ System.err.printf("Failure for '%s': %s", docId, throwable);
+ throwable.printStackTrace();
+ } else if (result.type() == Result.Type.failure) {
+ System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("<no messsage>"));
+ }
+ });
+ } catch (InterruptedException e) {
+ log.log(Level.SEVERE, "Got interrupt exception.", e);
+ break;
+ }
+ }
+ log.info("Shutting down feeding thread");
+ this.feedClient.close();
+ finishedDraining.countDown();
+ }
+
+} \ No newline at end of file
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
new file mode 100644
index 00000000000..5ece9051e41
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
@@ -0,0 +1,34 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+class SimpleExample {
+
+ public static void main(String[] args) {
+ try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) {
+ DocumentId id = DocumentId.of("namespace", "documenttype", "1");
+ String json = "{\"fields\": {\"title\": \"hello world\"}}";
+ OperationParameters params = OperationParameters.empty()
+ .timeout(Duration.ofSeconds(5))
+ .route("myvesparoute");
+ CompletableFuture<Result> promise = client.put(id, json, params);
+ promise.whenComplete(((result, throwable) -> {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ } else {
+ System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage());
+ }
+ }));
+ }
+ }
+
+}