diff options
Diffstat (limited to 'vespa-feed-client')
34 files changed, 2861 insertions, 785 deletions
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json new file mode 100644 index 00000000000..70cb4c3f09f --- /dev/null +++ b/vespa-feed-client/abi-spec.json @@ -0,0 +1,386 @@ +{ + "ai.vespa.feed.client.BenchmarkingCluster": { + "superClass": "java.lang.Object", + "interfaces": [ + "ai.vespa.feed.client.Cluster" + ], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(ai.vespa.feed.client.Cluster)", + "public void dispatch(ai.vespa.feed.client.HttpRequest, java.util.concurrent.CompletableFuture)", + "public ai.vespa.feed.client.OperationStats stats()", + "public void close()" + ], + "fields": [] + }, + "ai.vespa.feed.client.DocumentId": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, java.lang.String)", + "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, long, java.lang.String)", + "public static ai.vespa.feed.client.DocumentId of(java.lang.String, java.lang.String, java.lang.String, java.lang.String)", + "public static ai.vespa.feed.client.DocumentId of(java.lang.String)", + "public java.lang.String documentType()", + "public java.lang.String namespace()", + "public java.util.OptionalLong number()", + "public java.util.Optional group()", + "public java.lang.String userSpecific()", + "public boolean equals(java.lang.Object)", + "public int hashCode()", + "public java.lang.String toString()" + ], + "fields": [] + }, + "ai.vespa.feed.client.DynamicThrottler": { + "superClass": "ai.vespa.feed.client.StaticThrottler", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(ai.vespa.feed.client.FeedClientBuilder)", + "public void sent(long, java.util.concurrent.CompletableFuture)", + "public void success()", + "public void throttled(long)", + "public long targetInflight()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedClient$CircuitBreaker$State": { + "superClass": "java.lang.Enum", + "interfaces": [], + "attributes": [ + "public", + "final", + "enum" + ], + "methods": [ + "public static ai.vespa.feed.client.FeedClient$CircuitBreaker$State[] values()", + "public static ai.vespa.feed.client.FeedClient$CircuitBreaker$State valueOf(java.lang.String)" + ], + "fields": [ + "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State CLOSED", + "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State HALF_OPEN", + "public static final enum ai.vespa.feed.client.FeedClient$CircuitBreaker$State OPEN" + ] + }, + "ai.vespa.feed.client.FeedClient$CircuitBreaker": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract void success()", + "public abstract void failure()", + "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedClient$OperationType": { + "superClass": "java.lang.Enum", + "interfaces": [], + "attributes": [ + "public", + "final", + "enum" + ], + "methods": [ + "public static ai.vespa.feed.client.FeedClient$OperationType[] values()", + "public static ai.vespa.feed.client.FeedClient$OperationType valueOf(java.lang.String)" + ], + "fields": [ + "public static final enum ai.vespa.feed.client.FeedClient$OperationType PUT", + "public static final enum ai.vespa.feed.client.FeedClient$OperationType UPDATE", + "public static final enum ai.vespa.feed.client.FeedClient$OperationType REMOVE" + ] + }, + "ai.vespa.feed.client.FeedClient$RetryStrategy": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)", + "public int retries()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedClient$Throttler": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract void sent(long, java.util.concurrent.CompletableFuture)", + "public abstract void success()", + "public abstract void throttled(long)", + "public abstract long targetInflight()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedClient": { + "superClass": "java.lang.Object", + "interfaces": [ + "java.io.Closeable" + ], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public abstract java.util.concurrent.CompletableFuture put(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)", + "public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)", + "public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)", + "public abstract ai.vespa.feed.client.OperationStats stats()", + "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()", + "public abstract void close(boolean)", + "public void close()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedClientBuilder": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public static ai.vespa.feed.client.FeedClientBuilder create(java.net.URI)", + "public static ai.vespa.feed.client.FeedClientBuilder create(java.util.List)", + "public ai.vespa.feed.client.FeedClientBuilder setConnectionsPerEndpoint(int)", + "public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)", + "public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)", + "public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)", + "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)", + "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)", + "public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)", + "public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)", + "public ai.vespa.feed.client.FeedClientBuilder setCircuitBreaker(ai.vespa.feed.client.FeedClient$CircuitBreaker)", + "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.nio.file.Path, java.nio.file.Path)", + "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.util.Collection, java.security.PrivateKey)", + "public ai.vespa.feed.client.FeedClientBuilder setCertificate(java.security.cert.X509Certificate, java.security.PrivateKey)", + "public ai.vespa.feed.client.FeedClientBuilder setCaCertificatesFile(java.nio.file.Path)", + "public ai.vespa.feed.client.FeedClientBuilder setCaCertificates(java.util.Collection)", + "public ai.vespa.feed.client.FeedClient build()" + ], + "fields": [] + }, + "ai.vespa.feed.client.FeedException": { + "superClass": "java.lang.RuntimeException", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(java.lang.String)", + "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String)", + "public void <init>(java.lang.String, java.lang.Throwable)", + "public void <init>(java.lang.Throwable)", + "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)", + "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)", + "public java.util.Optional documentId()" + ], + "fields": [] + }, + "ai.vespa.feed.client.GracePeriodCircuitBreaker": { + "superClass": "java.lang.Object", + "interfaces": [ + "ai.vespa.feed.client.FeedClient$CircuitBreaker" + ], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(java.time.Duration, java.time.Duration)", + "public void success()", + "public void failure()", + "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()" + ], + "fields": [] + }, + "ai.vespa.feed.client.JsonFeeder$Builder": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public ai.vespa.feed.client.JsonFeeder$Builder withTimeout(java.time.Duration)", + "public ai.vespa.feed.client.JsonFeeder$Builder withRoute(java.lang.String)", + "public ai.vespa.feed.client.JsonFeeder$Builder withTracelevel(int)", + "public ai.vespa.feed.client.JsonFeeder build()" + ], + "fields": [] + }, + "ai.vespa.feed.client.JsonFeeder$ResultCallback": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public", + "interface", + "abstract" + ], + "methods": [ + "public void onNextResult(ai.vespa.feed.client.Result, ai.vespa.feed.client.FeedException)", + "public void onError(ai.vespa.feed.client.FeedException)", + "public void onComplete()" + ], + "fields": [] + }, + "ai.vespa.feed.client.JsonFeeder": { + "superClass": "java.lang.Object", + "interfaces": [ + "java.io.Closeable" + ], + "attributes": [ + "public" + ], + "methods": [ + "public static ai.vespa.feed.client.JsonFeeder$Builder builder(ai.vespa.feed.client.FeedClient)", + "public java.util.concurrent.CompletableFuture feedSingle(java.lang.String)", + "public java.util.concurrent.CompletableFuture feedMany(java.io.InputStream, ai.vespa.feed.client.JsonFeeder$ResultCallback)", + "public java.util.concurrent.CompletableFuture feedMany(java.io.InputStream)", + "public void close()" + ], + "fields": [] + }, + "ai.vespa.feed.client.OperationParameters": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public static ai.vespa.feed.client.OperationParameters empty()", + "public ai.vespa.feed.client.OperationParameters createIfNonExistent(boolean)", + "public ai.vespa.feed.client.OperationParameters testAndSetCondition(java.lang.String)", + "public ai.vespa.feed.client.OperationParameters timeout(java.time.Duration)", + "public ai.vespa.feed.client.OperationParameters route(java.lang.String)", + "public ai.vespa.feed.client.OperationParameters tracelevel(int)", + "public boolean createIfNonExistent()", + "public java.util.Optional testAndSetCondition()", + "public java.util.Optional timeout()", + "public java.util.Optional route()", + "public java.util.OptionalInt tracelevel()", + "public boolean equals(java.lang.Object)", + "public int hashCode()", + "public java.lang.String toString()" + ], + "fields": [] + }, + "ai.vespa.feed.client.OperationParseException": { + "superClass": "ai.vespa.feed.client.FeedException", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(java.lang.String)", + "public void <init>(java.lang.String, java.lang.Throwable)" + ], + "fields": [] + }, + "ai.vespa.feed.client.OperationStats": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(long, java.util.Map, long, long, long, long, long, long, long)", + "public long requests()", + "public long responses()", + "public long successes()", + "public java.util.Map responsesByCode()", + "public long exceptions()", + "public long inflight()", + "public long averageLatencyMillis()", + "public long minLatencyMillis()", + "public long maxLatencyMillis()", + "public long bytesSent()", + "public long bytesReceived()", + "public java.lang.String toString()" + ], + "fields": [] + }, + "ai.vespa.feed.client.Result$Type": { + "superClass": "java.lang.Enum", + "interfaces": [], + "attributes": [ + "public", + "final", + "enum" + ], + "methods": [ + "public static ai.vespa.feed.client.Result$Type[] values()", + "public static ai.vespa.feed.client.Result$Type valueOf(java.lang.String)" + ], + "fields": [ + "public static final enum ai.vespa.feed.client.Result$Type success", + "public static final enum ai.vespa.feed.client.Result$Type conditionNotMet", + "public static final enum ai.vespa.feed.client.Result$Type failure" + ] + }, + "ai.vespa.feed.client.Result": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public ai.vespa.feed.client.Result$Type type()", + "public ai.vespa.feed.client.DocumentId documentId()", + "public java.util.Optional resultMessage()", + "public java.util.Optional traceMessage()" + ], + "fields": [] + }, + "ai.vespa.feed.client.ResultParseException": { + "superClass": "ai.vespa.feed.client.FeedException", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String)", + "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)" + ], + "fields": [] + }, + "ai.vespa.feed.client.StaticThrottler": { + "superClass": "java.lang.Object", + "interfaces": [ + "ai.vespa.feed.client.FeedClient$Throttler" + ], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(ai.vespa.feed.client.FeedClientBuilder)", + "public void sent(long, java.util.concurrent.CompletableFuture)", + "public void success()", + "public void throttled(long)", + "public long targetInflight()" + ], + "fields": [ + "protected final long maxInflight", + "protected final long minInflight" + ] + } +}
\ No newline at end of file diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 7759e9d2308..7d4938c6fb0 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -20,6 +20,11 @@ <dependencies> <!-- compile scope --> <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>annotations</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <scope>compile</scope> @@ -83,6 +88,10 @@ </execution> </executions> </plugin> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>abi-check-plugin</artifactId> + </plugin> </plugins> </build> </project> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java new file mode 100644 index 00000000000..e5d45a2f211 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -0,0 +1,165 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; +import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; + +/** + * @author jonmv + */ +class ApacheCluster implements Cluster { + + private final List<Endpoint> endpoints = new ArrayList<>(); + + ApacheCluster(FeedClientBuilder builder) throws IOException { + for (URI endpoint : builder.endpoints) + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); + } + + @Override + public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) + request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); + + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { + index = i; + min = endpoints.get(i).inflight.get(); + } + + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); + try { + request.setScheme(endpoint.url.getScheme()); + request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); + endpoint.client.execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + } + + @Override + public void close() { + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.close(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + + private static class Endpoint { + + private final CloseableHttpAsyncClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI url; + + private Endpoint(CloseableHttpAsyncClient client, URI url) { + this.client = client; + this.url = url; + + this.client.start(); + } + + } + + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { + H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() + .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) + .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) + .disableCookieManagement() + .disableRedirectHandling() + .disableAutomaticRetries() + .setIOReactorConfig(IOReactorConfig.custom() + .setIoThreadCount(2) + .setTcpNoDelay(true) + .setSoTimeout(Timeout.ofSeconds(10)) + .build()) + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(Timeout.ofSeconds(10)) + .setConnectionRequestTimeout(Timeout.DISABLED) + .setResponseTimeout(Timeout.ofMinutes(5)) + .build()) + .setH2Config(H2Config.custom() + .setMaxConcurrentStreams(builder.maxStreamsPerConnection) + .setCompressionEnabled(true) + .setPushEnabled(false) + .setInitialWindowSize(Integer.MAX_VALUE) + .build()); + + SSLContext sslContext = builder.constructSslContext(); + String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); + if (allowedCiphers.length == 0) + throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); + + ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() + .setCiphers(allowedCiphers) + .setSslContext(sslContext); + if (builder.hostnameVerifier != null) { + tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); + } + return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) + .build(); + } + + private static class ApacheHttpResponse implements HttpResponse { + + private final SimpleHttpResponse wrapped; + + private ApacheHttpResponse(SimpleHttpResponse wrapped) { + this.wrapped = wrapped; + } + + @Override + public int code() { + return wrapped.getCode(); + } + + @Override + public byte[] body() { + return wrapped.getBodyBytes(); + } + + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java new file mode 100644 index 00000000000..840219a6bf1 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -0,0 +1,102 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +public class BenchmarkingCluster implements Cluster { + + private final Cluster delegate; + private final ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "cluster-stats-collector"); + thread.setDaemon(true); + return thread; + }); + + private final AtomicLong requests = new AtomicLong(); + private long results = 0; + private long responses = 0; + private final long[] responsesByCode = new long[600]; + private long exceptions = 0; + private long totalLatencyMillis = 0; + private long minLatencyMillis = Long.MAX_VALUE; + private long maxLatencyMillis = 0; + private long bytesSent = 0; + private long bytesReceived = 0; + + public BenchmarkingCluster(Cluster delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + requests.incrementAndGet(); + long startNanos = System.nanoTime(); + delegate.dispatch(request, vessel); + vessel.whenCompleteAsync((response, thrown) -> { + results++; + if (thrown == null) { + responses++; + responsesByCode[response.code()]++; + long latency = (System.nanoTime() - startNanos) / 1_000_000; + totalLatencyMillis += latency; + minLatencyMillis = Math.min(minLatencyMillis, latency); + maxLatencyMillis = Math.max(maxLatencyMillis, latency); + bytesSent += request.body() == null ? 0 : request.body().length; + bytesReceived += response.body() == null ? 0 : response.body().length; + } + else + exceptions++; + }, + executor); + } + + @Override + public OperationStats stats() { + try { + try { + return executor.submit(this::getStats).get(); + } + catch (RejectedExecutionException ignored) { + executor.awaitTermination(10, TimeUnit.SECONDS); + return getStats(); + } + } + catch (InterruptedException | ExecutionException ignored) { + throw new RuntimeException(ignored); + } + } + + private OperationStats getStats() { + Map<Integer, Long> responses = new HashMap<>(); + for (int code = 0; code < responsesByCode.length; code++) + if (responsesByCode[code] > 0) + responses.put(code, responsesByCode[code]); + + return new OperationStats(requests.get(), + responses, + exceptions, + requests.get() - results, + this.responses == 0 ? 0 : totalLatencyMillis / this.responses, + minLatencyMillis, + maxLatencyMillis, + bytesSent, + bytesReceived); + } + + @Override + public void close() { + delegate.close(); + executor.shutdown(); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java new file mode 100644 index 00000000000..f428fb567e6 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.io.Closeable; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; + +/** + * Allows dispatch to a Vespa cluster. + */ +interface Cluster extends Closeable { + + /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ + void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel); + + @Override + default void close() { } + + default OperationStats stats() { return new OperationStats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java index 21513a5dac2..39fc9fb28e0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java @@ -8,6 +8,8 @@ import java.util.OptionalLong; import static java.util.Objects.requireNonNull; /** + * Represents a Vespa document id + * * @author jonmv */ public class DocumentId { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java new file mode 100644 index 00000000000..6f4e4e752f0 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java @@ -0,0 +1,86 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Math.log; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.Math.pow; +import static java.lang.Math.random; + +/** + * Samples latency as a function of inflight requests, and regularly adjusts to the optimal value. + * + * @author jonmv + */ +public class DynamicThrottler extends StaticThrottler { + + private final AtomicLong targetInflight; + private long updateNanos = 0; + private final List<AtomicLong> latencies = new ArrayList<>(); + private final double weight = 0.9; // Higher weight favours higher (own) throughput, at the cost of (shared) latency. + + public DynamicThrottler(FeedClientBuilder builder) { + super(builder); + this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size()); + for (int i = 0; i < 128; i++) + latencies.add(new AtomicLong(-1)); + } + + @Override + public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { + long startNanos = System.nanoTime(); + if (updateNanos == 0) updateNanos = System.nanoTime(); + boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second. + if (update) updateNanos = startNanos; + + vessel.whenComplete((response, thrown) -> { + // Use buckets for latency measurements, with inflight along a log scale, + // and with minInflight and maxInflight at the ends. + int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight)) + / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + long nowNanos = System.nanoTime(); + long latencyNanos = nowNanos - startNanos; + latencies.get(index).set(latencyNanos); + if ( ! update) + return; + + // Loop over latency measurements and pick the one which optimises throughput and latency. + double choice = -1; + double max = -1; + for (int i = latencies.size(); i-- > 0; ) { + double latency = latencies.get(i).get(); + if (latency < 0) continue; // Skip unknown values. + double target = minInflight * pow(256, (i + 0.5) / latencies.size()); + double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight). + if (objective > max) { + max = objective; + choice = target; + } + } + long target = (long) ((random() * 0.25 + 0.90) * choice); // Random walk, skewed towards increase. + targetInflight.set(max(minInflight, min(maxInflight, target))); + }); + } + + @Override + public void success() { + super.success(); + } + + @Override + public void throttled(long inflight) { + super.throttled(inflight); + } + + @Override + public long targetInflight() { + return min(super.targetInflight(), targetInflight.get()); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index 455a79060ee..f39b56ad50f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -5,15 +5,43 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; /** + * Asynchronous feed client accepting document operations as JSON + * * @author bjorncs * @author jonmv */ public interface FeedClient extends Closeable { + /** + * Send a document put with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); + + /** + * Send a document update with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); + + /** Send a document remove with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + * */ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); + /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ + OperationStats stats(); + + /** Current state of the circuit breaker. */ + default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; } + + /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ + void close(boolean graceful); + + /** Initiates graceful shutdown. See {@link #close(boolean)}. */ + default void close() { close(true); } + + /** Controls what to retry, and how many times. */ interface RetryStrategy { /** Whether to retry operations of the given type. */ @@ -24,10 +52,62 @@ public interface FeedClient extends Closeable { } + /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ + interface CircuitBreaker { + + /** Called by the client whenever a successful response is obtained. */ + void success(); + + /** Called by the client whenever a transient or fatal error occurs. */ + void failure(); + + /** The current state of the circuit breaker. */ + State state(); + + enum State { + + /** Circuit is closed: business as usual. */ + CLOSED, + + /** Circuit is half-open: something is wrong, perhaps it recovers? */ + HALF_OPEN, + + /** Circuit is open: we have given up. */ + OPEN; + + } + + } + enum OperationType { - put, - update, - remove; + + /** A document put operation. This is idempotent. */ + PUT, + + /** A document update operation. This is idempotent if all its contained updates are. */ + UPDATE, + + /** A document remove operation. This is idempotent. */ + REMOVE; + + } + + + /** Determines the number of requests to have inflight at any point. */ + interface Throttler { + + /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */ + void sent(long inflight, CompletableFuture<HttpResponse> vessel); + + /** A successful response was obtained. */ + void success(); + + /** A throttle signal was obtained from the server. */ + void throttled(long inflight); + + /** The target inflight operations right now. */ + long targetInflight(); + } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index eaf84c67ac4..0f685ec5b7f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -7,8 +7,14 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Path; -import java.time.Clock; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -22,38 +28,51 @@ import static java.util.Objects.requireNonNull; */ public class FeedClientBuilder { - FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; + static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; - final URI endpoint; + final List<URI> endpoints; final Map<String, Supplier<String>> requestHeaders = new HashMap<>(); SSLContext sslContext; HostnameVerifier hostnameVerifier; - int maxConnections = 4; - int maxStreamsPerConnection = 1024; + int connectionsPerEndpoint = 4; + int maxStreamsPerConnection = 4096; FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy; - Path certificate; - Path privateKey; - Path caCertificates; - Clock clock; + FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10)); + Path certificateFile; + Path privateKeyFile; + Path caCertificatesFile; + Collection<X509Certificate> certificate; + PrivateKey privateKey; + Collection<X509Certificate> caCertificates; + boolean benchmark; - public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); } + /** Creates a builder for a single container endpoint **/ + public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } - private FeedClientBuilder(URI endpoint) { - requireNonNull(endpoint.getHost()); - this.endpoint = endpoint; + /** Creates a builder for multiple container endpoints **/ + public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); } + + private FeedClientBuilder(List<URI> endpoints) { + if (endpoints.isEmpty()) + throw new IllegalArgumentException("At least one endpoint must be provided"); + + for (URI endpoint : endpoints) + requireNonNull(endpoint.getHost()); + + this.endpoints = new ArrayList<>(endpoints); } /** - * Sets the maximum number of connections this client will use. + * Sets the number of connections this client will use per endpoint. * * A reasonable value here is a small multiple of the numbers of containers in the * cluster to feed, so load can be balanced across these. * In general, this value should be kept as low as possible, but poor connectivity * between feeder and cluster may also warrant a higher number of connections. */ - public FeedClientBuilder setMaxConnections(int max) { + public FeedClientBuilder setConnectionsPerEndpoint(int max) { if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max); - this.maxConnections = max; + this.connectionsPerEndpoint = max; return this; } @@ -70,52 +89,137 @@ public class FeedClientBuilder { return this; } + /** Sets {@link SSLContext} instance. */ public FeedClientBuilder setSslContext(SSLContext context) { - if (certificate != null || caCertificates != null || privateKey != null) { - throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); - } this.sslContext = requireNonNull(context); return this; } + /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */ public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { this.hostnameVerifier = requireNonNull(verifier); return this; } + /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */ + public FeedClientBuilder setBenchmarkOn(boolean on) { + this.benchmark = on; + return this; + } + + /** Adds HTTP request header to all client requests. */ public FeedClientBuilder addRequestHeader(String name, String value) { return addRequestHeader(name, () -> requireNonNull(value)); } + /** + * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request, + * i.e. value can be dynamically updated during a feed. + */ public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) { this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier)); return this; } + /** + * Overrides default retry strategy. + * @see FeedClient.RetryStrategy + */ public FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy) { this.retryStrategy = requireNonNull(strategy); return this; } + /** + * Overrides default circuit breaker. + * @see FeedClient.CircuitBreaker + */ + public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) { + this.circuitBreaker = requireNonNull(breaker); + return this; + } + + /** Sets path to client SSL certificate/key PEM files */ public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate"); - this.certificate = certificatePemFile; - this.privateKey = privateKeyPemFile; + this.certificateFile = certificatePemFile; + this.privateKeyFile = privateKeyPemFile; + return this; + } + + /** Sets client SSL certificates/key */ + public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; return this; } - public FeedClientBuilder setCaCertificates(Path caCertificatesFile) { - if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and CA certificate"); - this.caCertificates = caCertificatesFile; + /** Sets client SSL certificate/key */ + public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) { + return setCertificate(Collections.singletonList(certificate), privateKey); + } + + /** + * Overrides JVM default SSL truststore + * @param caCertificatesFile Path to PEM encoded file containing trusted certificates + */ + public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) { + this.caCertificatesFile = caCertificatesFile; + return this; + } + + /** Overrides JVM default SSL truststore */ + public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; return this; } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ public FeedClient build() { try { + validateConfiguration(); return new HttpFeedClient(this); } catch (IOException e) { throw new UncheckedIOException(e); } } + SSLContext constructSslContext() throws IOException { + if (sslContext != null) return sslContext; + SslContextBuilder sslContextBuilder = new SslContextBuilder(); + if (certificateFile != null && privateKeyFile != null) { + sslContextBuilder.withCertificateAndKey(certificateFile, privateKeyFile); + } else if (certificate != null && privateKey != null) { + sslContextBuilder.withCertificateAndKey(certificate, privateKey); + } + if (caCertificatesFile != null) { + sslContextBuilder.withCaCertificates(caCertificatesFile); + } else if (caCertificates != null) { + sslContextBuilder.withCaCertificates(caCertificates); + } + return sslContextBuilder.build(); + } + + private void validateConfiguration() { + if (sslContext != null && ( + certificateFile != null || caCertificatesFile != null || privateKeyFile != null || + certificate != null || caCertificates != null || privateKey != null)) { + throw new IllegalArgumentException("Cannot set both SSLContext and certificate / CA certificates"); + } + if (certificate != null && certificateFile != null) { + throw new IllegalArgumentException("Cannot set both certificate directly and as file"); + } + if (privateKey != null && privateKeyFile != null) { + throw new IllegalArgumentException("Cannot set both private key directly and as file"); + } + if (caCertificates != null && caCertificatesFile != null) { + throw new IllegalArgumentException("Cannot set both CA certificates directly and as file"); + } + if (certificate != null && certificate.isEmpty()) { + throw new IllegalArgumentException("Certificate cannot be empty"); + } + if (caCertificates != null && caCertificates.isEmpty()) { + throw new IllegalArgumentException("CA certificates cannot be empty"); + } + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java index eb31d1aa808..54e11d3a185 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java @@ -1,8 +1,47 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import java.util.Optional; + /** + * Signals that an error occurred during feeding + * * @author bjorncs */ public class FeedException extends RuntimeException { + + private final DocumentId documentId; + + public FeedException(String message) { + super(message); + this.documentId = null; + } + + public FeedException(DocumentId documentId, String message) { + super(message); + this.documentId = documentId; + } + + public FeedException(String message, Throwable cause) { + super(message, cause); + this.documentId = null; + } + + public FeedException(Throwable cause) { + super(cause); + this.documentId = null; + } + + public FeedException(DocumentId documentId, Throwable cause) { + super(cause); + this.documentId = documentId; + } + + public FeedException(DocumentId documentId, String message, Throwable cause) { + super(message, cause); + this.documentId = documentId; + } + + public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java new file mode 100644 index 00000000000..2c5c2dccf19 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java @@ -0,0 +1,71 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import java.util.logging.Logger; + +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; + +/** + * Breaks the circuit when no successes have been recorded for a specified time. + * + * @author jonmv + */ +public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { + + private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); + private static final long NEVER = 1L << 60; + + private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); + private final AtomicBoolean halfOpen = new AtomicBoolean(false); + private final AtomicBoolean open = new AtomicBoolean(false); + private final LongSupplier clock; + private final long graceMillis; + private final long doomMillis; + + public GracePeriodCircuitBreaker(Duration grace, Duration doom) { + this(System::currentTimeMillis, grace, doom); + } + + GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { + if (grace.isNegative()) + throw new IllegalArgumentException("Grace delay must be non-negative"); + + if (doom.isNegative()) + throw new IllegalArgumentException("Doom delay must be non-negative"); + + this.clock = requireNonNull(clock); + this.graceMillis = grace.toMillis(); + this.doomMillis = doom.toMillis(); + } + + @Override + public void success() { + failingSinceMillis.set(NEVER); + if ( ! open.get() && halfOpen.compareAndSet(true, false)) + log.log(INFO, "Circuit breaker is now closed"); + } + + @Override + public void failure() { + failingSinceMillis.compareAndSet(NEVER, clock.getAsLong()); + } + + @Override + public State state() { + long failingMillis = clock.getAsLong() - failingSinceMillis.get(); + if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) + log.log(INFO, "Circuit breaker is now half-open"); + + if (failingMillis > doomMillis && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open"); + + return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 8a38e859ca4..2269c56cde4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -1,40 +1,22 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.client5.http.config.RequestConfig; -import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; -import org.apache.hc.client5.http.impl.async.H2AsyncClientBuilder; -import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; -import org.apache.hc.core5.concurrent.FutureCallback; -import org.apache.hc.core5.http.ContentType; -import org.apache.hc.core5.http.message.BasicHeader; -import org.apache.hc.core5.http2.config.H2Config; -import org.apache.hc.core5.net.URIBuilder; -import org.apache.hc.core5.reactor.IOReactorConfig; -import org.apache.hc.core5.util.Timeout; - -import javax.net.ssl.SSLContext; -import java.io.ByteArrayOutputStream; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + import java.io.IOException; -import java.io.PrintStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.time.Clock; -import java.util.ArrayList; -import java.util.Collections; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeH2Blacklisted; -import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; /** * HTTP implementation of {@link FeedClient} @@ -44,73 +26,19 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; */ class HttpFeedClient implements FeedClient { - private final URI endpoint; + private static final JsonFactory factory = new JsonFactory(); + private final Map<String, Supplier<String>> requestHeaders; private final RequestStrategy requestStrategy; - private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>(); - private final List<AtomicInteger> inflight = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(); HttpFeedClient(FeedClientBuilder builder) throws IOException { - this.endpoint = builder.endpoint; - this.requestHeaders = new HashMap<>(builder.requestHeaders); - this.requestStrategy = new HttpRequestStrategy(builder, Clock.systemUTC()); - - for (int i = 0; i < builder.maxConnections; i++) { - CloseableHttpAsyncClient client = createHttpClient(builder); - client.start(); - httpClients.add(client); - inflight.add(new AtomicInteger()); - } + this(builder, new HttpRequestStrategy(builder)); } - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { - H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() - .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) - .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) - .disableCookieManagement() - .disableRedirectHandling() - .disableAutomaticRetries() - .setIOReactorConfig(IOReactorConfig.custom() - .setSoTimeout(Timeout.ofSeconds(10)) - .build()) - .setDefaultRequestConfig( - RequestConfig.custom() - .setConnectTimeout(Timeout.ofSeconds(10)) - .setConnectionRequestTimeout(Timeout.DISABLED) - .setResponseTimeout(Timeout.ofMinutes(5)) - .build()) - .setH2Config(H2Config.initial() - .setMaxConcurrentStreams(builder.maxStreamsPerConnection) - .setCompressionEnabled(true) - .setPushEnabled(false) - .build()); - - SSLContext sslContext = constructSslContext(builder); - String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); - if (allowedCiphers.length == 0) - throw new IllegalStateException("No adequate SSL cipher suites supported by the JVM"); - - ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create() - .setCiphers(allowedCiphers) - .setSslContext(sslContext); - if (builder.hostnameVerifier != null) { - tlsStrategyBuilder.setHostnameVerifier(builder.hostnameVerifier); - } - return httpClientBuilder.setTlsStrategy(tlsStrategyBuilder.build()) - .build(); - } - - private static SSLContext constructSslContext(FeedClientBuilder builder) throws IOException { - if (builder.sslContext != null) return builder.sslContext; - SslContextBuilder sslContextBuilder = new SslContextBuilder(); - if (builder.certificate != null && builder.privateKey != null) { - sslContextBuilder.withCertificateAndKey(builder.certificate, builder.privateKey); - } - if (builder.caCertificates != null) { - sslContextBuilder.withCaCertificates(builder.caCertificates); - } - return sslContextBuilder.build(); + HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy) { + this.requestHeaders = new HashMap<>(builder.requestHeaders); + this.requestStrategy = requestStrategy; } @Override @@ -129,107 +57,122 @@ class HttpFeedClient implements FeedClient { } @Override - public void close() throws IOException { - if ( ! closed.getAndSet(true)) - for (CloseableHttpAsyncClient hc : httpClients) - hc.close(); + public OperationStats stats() { + return requestStrategy.stats(); } - private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { - SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params)); - requestHeaders.forEach((name, value) -> request.setHeader(name, value.get())); - if (operationJson != null) - request.setBody(operationJson, ContentType.APPLICATION_JSON); - - return requestStrategy.enqueue(documentId, request, this::send) - .handle((response, thrown) -> { - if (thrown != null) { - if (requestStrategy.hasFailed()) { - try { close(); } - catch (IOException exception) { thrown.addSuppressed(exception); } - } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - thrown.printStackTrace(new PrintStream(buffer)); - return new Result(Result.Type.failure, documentId, buffer.toString(), null); - } - return toResult(response, documentId); - }); + @Override + public CircuitBreaker.State circuitBreakerState() { + return requestStrategy.circuitBreakerState(); } - /** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */ - private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { - int index = 0; - int min = Integer.MAX_VALUE; - for (int i = 0; i < httpClients.size(); i++) - if (inflight.get(i).get() < min) { - min = inflight.get(i).get(); - index = i; - } + @Override + public void close(boolean graceful) { + closed.set(true); + if (graceful) + requestStrategy.await(); - inflight.get(index).incrementAndGet(); - try { - httpClients.get(index).execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.thenRun(inflight.get(index)::decrementAndGet); + requestStrategy.destroy(); + } + + private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { + HttpRequest request = new HttpRequest(method, + getPath(documentId) + getQuery(params), + requestHeaders, + operationJson.getBytes(UTF_8)); // TODO: make it bytes all the way? + + return requestStrategy.enqueue(documentId, request) + .thenApply(response -> toResult(request, response, documentId)); } - static Result toResult(SimpleHttpResponse response, DocumentId documentId) { + static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) { Result.Type type; - switch (response.getCode()) { + switch (response.code()) { case 200: type = Result.Type.success; break; case 412: type = Result.Type.conditionNotMet; break; - default: type = Result.Type.failure; + case 502: + case 504: + case 507: type = Result.Type.failure; break; + default: type = null; } - Map<String, String> responseJson = null; // TODO: parse JSON on error. - return new Result(type, documentId, response.getBodyText(), "trace"); + + String message = null; + String trace = null; + try { + JsonParser parser = factory.createParser(response.body()); + if (parser.nextToken() != JsonToken.START_OBJECT) + throw new ResultParseException( + documentId, + "Expected '" + JsonToken.START_OBJECT + "', but found '" + parser.currentToken() + "' in: " + + new String(response.body(), UTF_8)); + + String name; + while ((name = parser.nextFieldName()) != null) { + switch (name) { + case "message": message = parser.nextTextValue(); break; + case "trace": trace = parser.nextTextValue(); break; + default: parser.nextToken(); + } + } + + if (parser.currentToken() != JsonToken.END_OBJECT) + throw new ResultParseException( + documentId, + "Expected '" + JsonToken.END_OBJECT + "', but found '" + parser.currentToken() + "' in: " + + new String(response.body(), UTF_8)); + } + catch (IOException e) { + throw new ResultParseException(documentId, e); + } + + if (type == null) // Not a Vespa response, but a failure in the HTTP layer. + throw new ResultParseException( + documentId, + "Status " + response.code() + " executing '" + request + "': " + + (message == null ? new String(response.body(), UTF_8) : message)); + + return new Result(type, documentId, message, trace); } - static List<String> toPath(DocumentId documentId) { - List<String> path = new ArrayList<>(); + static String getPath(DocumentId documentId) { + StringJoiner path = new StringJoiner("/", "/", ""); path.add("document"); path.add("v1"); - path.add(documentId.namespace()); - path.add(documentId.documentType()); + path.add(encode(documentId.namespace())); + path.add(encode(documentId.documentType())); if (documentId.number().isPresent()) { path.add("number"); path.add(Long.toUnsignedString(documentId.number().getAsLong())); } else if (documentId.group().isPresent()) { path.add("group"); - path.add(documentId.group().get()); + path.add(encode(documentId.group().get())); } else { path.add("docid"); } - path.add(documentId.userSpecific()); + path.add(encode(documentId.userSpecific())); - return path; + return path.toString(); } - static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) { - URIBuilder url = new URIBuilder(endpoint); - url.setPathSegments(toPath(documentId)); - - if (params.createIfNonExistent()) url.addParameter("create", "true"); - params.testAndSetCondition().ifPresent(condition -> url.addParameter("condition", condition)); - params.timeout().ifPresent(timeout -> url.addParameter("timeout", timeout.toMillis() + "ms")); - params.route().ifPresent(route -> url.addParameter("route", route)); - params.tracelevel().ifPresent(tracelevel -> url.addParameter("tracelevel", Integer.toString(tracelevel))); - + static String encode(String raw) { try { - return url.build(); + return URLEncoder.encode(raw, UTF_8.name()); } - catch (URISyntaxException e) { + catch (UnsupportedEncodingException e) { throw new IllegalStateException(e); } } + static String getQuery(OperationParameters params) { + StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); + if (params.createIfNonExistent()) query.add("create=true"); + params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); + params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); + params.route().ifPresent(route -> query.add("route=" + encode(route))); + params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); + return query.toString(); + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java new file mode 100644 index 00000000000..8da2f46def2 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java @@ -0,0 +1,42 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Map; +import java.util.function.Supplier; + +class HttpRequest { + + private final String method; + private final String path; + private final Map<String, Supplier<String>> headers; + private final byte[] body; + + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body) { + this.method = method; + this.path = path; + this.headers = headers; + this.body = body; + } + + public String method() { + return method; + } + + public String path() { + return path; + } + + public Map<String, Supplier<String>> headers() { + return headers; + } + + public byte[] body() { + return body; + } + + @Override + public String toString() { + return method + " " + path; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index d0d67d65446..e9cd0baba5b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -1,26 +1,31 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; -import java.time.Clock; -import java.time.Instant; -import java.util.HashMap; +import java.nio.channels.CancelledKeyException; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.function.BiConsumer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; -import static java.lang.Math.max; -import static java.lang.Math.min; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; -import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; +// TODO: update doc /** * Controls request execution and retries: * <ul> @@ -31,58 +36,94 @@ import static java.util.logging.Level.INFO; * * @author jonmv */ -class HttpRequestStrategy implements RequestStrategy, AutoCloseable { +class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); - private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>(); - private final Object monitor = new Object(); - private final Clock clock; - private final RetryStrategy wrapped; - private final Thread delayer = new Thread(this::drainDelayed, "feed-client-retry-delayer"); - private final BlockingQueue<CompletableFuture<Void>> delayed = new LinkedBlockingQueue<>(); - private final long maxInflight; - private final long minInflight; - private double targetInflight; - private long inflight = 0; - private long consecutiveSuccesses = 0; - private Instant lastSuccess; - private boolean failed = false; - private boolean closed = false; - - HttpRequestStrategy(FeedClientBuilder builder, Clock clock) { - this.wrapped = builder.retryStrategy; - this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection; - this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection); - this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); - this.clock = clock; - this.lastSuccess = clock.instant(); - this.delayer.start(); - } - - private void drainDelayed() { - try { - while (true) { - do delayed.take().complete(null); - while ( ! hasFailed()); + private final Cluster cluster; + private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>(); + private final RetryStrategy strategy; + private final CircuitBreaker breaker; + final FeedClient.Throttler throttler; + private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); + private final AtomicLong inflight = new AtomicLong(0); + private final AtomicBoolean destroyed = new AtomicBoolean(false); + private final AtomicLong delayedCount = new AtomicLong(0); + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "feed-client-result-executor"); + thread.setDaemon(true); + return thread; + }); + + HttpRequestStrategy(FeedClientBuilder builder) throws IOException { + this(builder, new ApacheCluster(builder)); + } + + HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; + this.strategy = builder.retryStrategy; + this.breaker = builder.circuitBreaker; + this.throttler = new DynamicThrottler(builder); + + Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); + dispatcher.setDaemon(true); + dispatcher.start(); + } + + @Override + public OperationStats stats() { + return cluster.stats(); + } + + @Override + public CircuitBreaker.State circuitBreakerState() { + return breaker.state(); + } - Thread.sleep(1000); + private void dispatch() { + try { + while (breaker.state() != OPEN && ! destroyed.get()) { + while ( ! isInExcess() && poll() && breaker.state() == CLOSED); + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? } } catch (InterruptedException e) { - delayed.forEach(action -> action.cancel(true)); + Thread.currentThread().interrupt(); + log.log(WARNING, "Dispatch thread interrupted; shutting down"); } + destroy(); + } + + private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + delayedCount.incrementAndGet(); + queue.offer(() -> { + cluster.dispatch(request, vessel); + }); + } + + private boolean poll() { + Runnable task = queue.poll(); + if (task == null) return false; + delayedCount.decrementAndGet(); + task.run(); + return true; } - private boolean retry(SimpleHttpRequest request, int attempt) { - if (attempt >= wrapped.retries()) + + private boolean isInExcess() { + return inflight.get() - delayedCount.get() > throttler.targetInflight(); + } + + private boolean retry(HttpRequest request, int attempt) { + if (attempt > strategy.retries()) return false; - switch (request.getMethod().toUpperCase()) { - case "POST": return wrapped.retry(FeedClient.OperationType.put); - case "PUT": return wrapped.retry(FeedClient.OperationType.update); - case "DELETE": return wrapped.retry(FeedClient.OperationType.remove); - default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); + switch (request.method().toUpperCase()) { + case "POST": return strategy.retry(FeedClient.OperationType.PUT); + case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE); + case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE); + default: throw new IllegalStateException("Unexpected HTTP method: " + request.method()); } } @@ -90,158 +131,126 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { - failure(); - log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error"); - - if ( ! (thrown instanceof IOException)) - return false; + private boolean retry(HttpRequest request, Throwable thrown, int attempt) { + breaker.failure(); + log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); - return retry(request, attempt); - } - - void success() { - Instant now = clock.instant(); - synchronized (monitor) { - ++consecutiveSuccesses; - lastSuccess = now; - targetInflight = min(targetInflight + 0.1, maxInflight); - } - } + if ( (thrown instanceof IOException) // General IO problems. + || (thrown instanceof CancellationException) // TLS session disconnect. + || (thrown instanceof CancelledKeyException)) // Selection cancelled. + return retry(request, attempt); - void failure() { - Instant threshold = clock.instant().minusSeconds(300); - synchronized (monitor) { - consecutiveSuccesses = 0; - if (lastSuccess.isBefore(threshold)) - failed = true; - } + return false; } /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ - private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { - if (response.getCode() / 100 == 2) { - success(); + private boolean retry(HttpRequest request, HttpResponse response, int attempt) { + if (response.code() / 100 == 2) { + breaker.success(); + throttler.success(); return false; } - if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. - synchronized (monitor) { - targetInflight = max(inflight * 0.9, minInflight); - } - log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt + - " at " + request + ", " + consecutiveSuccesses + " successes since last error"); + log.log(FINE, () -> "Status code " + response.code() + " (" + new String(response.body(), UTF_8) + + ") on attempt " + attempt + " at " + request); + if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. + throttler.throttled((inflight.get() - delayedCount.get())); return true; } - log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt + - " at " + request + ", " + consecutiveSuccesses + " successes since last error"); + breaker.failure(); + if (response.code() == 500 || response.code() == 502 || response.code() == 504) // Hopefully temporary errors. + return retry(request, attempt); - failure(); - if (response.getCode() != 500 && response.getCode() != 502) - return false; - - return retry(request, attempt); // Hopefully temporary errors. + return false; } - // Must hold lock. private void acquireSlot() { try { - while (inflight >= targetInflight) - monitor.wait(); + while (inflight.get() >= throttler.targetInflight()) + Thread.sleep(1); - ++inflight; + inflight.incrementAndGet(); } catch (InterruptedException e) { throw new RuntimeException(e); } } - // Must hold lock. private void releaseSlot() { - for (long i = --inflight; i < targetInflight; i++) - monitor.notify(); + inflight.decrementAndGet(); } - @Override - public boolean hasFailed() { - synchronized (monitor) { - return failed; + public void await() { + try { + while (inflight.get() > 0) + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @Override - public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request, - BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) { - CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. - CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. - CompletableFuture<Void> blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete. - - // Get the previous inflight operation for this doc-id, or acquire a send slot. - CompletableFuture<Void> previous; - synchronized (monitor) { - previous = inflightById.put(documentId, blocker); - if (previous == null) - acquireSlot(); + public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { + CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. + CompletableFuture<?> previous = inflightById.put(documentId, result); + if (destroyed.get()) { + result.cancel(true); + return result; + } + + if (previous == null) { + acquireSlot(); + offer(request, vessel); + throttler.sent(inflight.get(), result); } - if (previous == null) // Send immediately if none inflight ... - dispatch.accept(request, vessel); - else // ... or send when the previous inflight is done. - previous.thenRun(() -> dispatch.accept(request, vessel)); - - handleAttempt(vessel, dispatch, request, result, 1); - - result.thenRun(() -> { - CompletableFuture<Void> current; - synchronized (monitor) { - current = inflightById.get(documentId); - if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ... - releaseSlot(); - inflightById.put(documentId, null); - } + else + previous.whenComplete((__, ___) -> offer(request, vessel)); + + handleAttempt(vessel, request, result, 1); + + return result.handle((response, error) -> { + if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) + releaseSlot(); + + if (error != null) { + if (error instanceof FeedException) throw (FeedException) error; + throw new FeedException(documentId, error); } - if (current != blocker) // ... or trigger sending the next enqueued operation. - blocker.complete(null); + return response; }); - - return result; } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch, - SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) { - vessel.whenComplete((response, thrown) -> { - // Retry the operation if it failed with a transient error ... - if (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt)) { - CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); - boolean hasFailed = hasFailed(); - if (hasFailed) - delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry))); - else - dispatch.accept(request, retry); - handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1)); - return; - } - - // ... or accept the outcome and mark the operation as complete. - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - }); + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) { + vessel.whenCompleteAsync((response, thrown) -> { + // Retry the operation if it failed with a transient error ... + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { + CircuitBreaker.State state = breaker.state(); + CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); + offer(request, retry); + handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + } + // ... or accept the outcome and mark the operation as complete. + else { + if (thrown == null) result.complete(response); + else result.completeExceptionally(thrown); + } + }, + resultExecutor); } @Override - public void close() { - synchronized (monitor) { - if (closed) - return; - - closed = true; + public void destroy() { + if ( ! destroyed.getAndSet(true)) { + inflightById.values().forEach(result -> result.cancel(true)); + cluster.close(); + resultExecutor.shutdown(); } - delayer.interrupt(); - try { delayer.join(); } - catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java new file mode 100644 index 00000000000..b1dd54240eb --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java @@ -0,0 +1,16 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +interface HttpResponse { + + int code(); + byte[] body(); + + static HttpResponse of(int code, byte[] body) { + return new HttpResponse() { + @Override public int code() { return code; } + @Override public byte[] body() { return body; } + }; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java new file mode 100644 index 00000000000..0ba373eef18 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -0,0 +1,484 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + * @author bjorncs + */ +public class JsonFeeder implements Closeable { + + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); + private final FeedClient client; + private final OperationParameters protoParameters; + + private JsonFeeder(FeedClient client, OperationParameters protoParameters) { + this.client = client; + this.protoParameters = protoParameters; + } + + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + default void onNextResult(Result result, FeedException error) { } + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + default void onError(FeedException error) { } + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + default void onComplete() { } + } + + public static Builder builder(FeedClient client) { return new Builder(client); } + + /** Feeds single JSON feed operations on the form + * <pre> + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * } + * </pre> + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Result> feedSingle(String json) { + CompletableFuture<Result> result = new CompletableFuture<>(); + try { + SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); + parser.next().whenCompleteAsync((operationResult, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(operationResult); + } + }, resultExecutor); + } catch (Exception e) { + resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); + } + return result; + } + + /** Feeds a stream containing a JSON array of feed operations on the form + * <pre> + * [ + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * }, + * { + * "put": "id:ns:type::foo", + * "fields": { ... document fields ... } + * }, + * { + * "update": "id:ns:type:n=4:bar", + * "create": true, + * "fields": { ... partial update fields ... } + * }, + * { + * "remove": "id:ns:type:g=foo:bar", + * "condition": "type.baz = \"bax\"" + * }, + * ... + * ] + * </pre> + * Note that {@code "id"} is an alias for the document put operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); + } + + /** + * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. + * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream) { + return feedMany(jsonStream, new ResultCallback() { }); + } + + CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { + CompletableFuture<Void> overallResult = new CompletableFuture<>(); + CompletableFuture<Result> result; + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenCompleteAsync((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultCallback.onNextResult(r, (FeedException) t); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultCallback.onComplete(); + overallResult.complete(null); + } + }, resultExecutor); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + FeedException wrapped = wrapException(e); + resultCallback.onError(wrapped); + overallResult.completeExceptionally(wrapped); + }); + } + } + return overallResult; + } + + private static final JsonFactory factory = new JsonFactory(); + + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private FeedException wrapException(Exception e) { + if (e instanceof FeedException) return (FeedException) e; + if (e instanceof IOException) { + return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); + } + return new FeedException(e); + } + + private class RingBufferStream extends InputStream { + + private final byte[] b = new byte[1]; + private final InputStream in; + private final byte[] data; + private final int size; + private final Object lock = new Object(); + private IOException thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; + private final OperationParserAndExecutor parserAndExecutor; + + RingBufferStream(InputStream in, int size) throws IOException { + this.in = in; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); + } + + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw thrown; + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + public CompletableFuture<Result> next() throws IOException { + return parserAndExecutor.next(); + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && !done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } catch (InterruptedException e) { + synchronized (lock) { + done = true; + thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); + } + } catch (IOException e) { + synchronized (lock) { + done = true; + thrown = e; + } + } + } + + private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { + + RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } + + @Override + String getDocumentJson(long start, long end) { + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + return payload; + } + } + } + + private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { + + private final byte[] json; + + SingleOperationParserAndExecutor(byte[] json) throws IOException { + super(factory.createParser(json), false); + this.json = json; + } + + @Override + String getDocumentJson(long start, long end) { + return new String(json, (int) start, (int) (end - start), UTF_8); + } + } + + private abstract class OperationParserAndExecutor { + + private final JsonParser parser; + private final boolean multipleOperations; + private boolean arrayPrefixParsed; + + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { + this.parser = parser; + this.multipleOperations = multipleOperations; + } + + abstract String getDocumentJson(long start, long end); + + CompletableFuture<Result> next() throws IOException { + if (multipleOperations && !arrayPrefixParsed){ + expect(START_ARRAY); + arrayPrefixParsed = true; + } + + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw new OperationParseException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + break; + + case END_OBJECT: + break loop; + + default: + throw new OperationParseException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + } + if (id == null) + throw new OperationParseException("No document id for document at offset " + start); + + if (end < start) + throw new OperationParseException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new OperationParseException("Unexpected operation type '" + type + "'"); + } + } + + private void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + } + + public static class Builder { + + final FeedClient client; + OperationParameters parameters = OperationParameters.empty(); + + private Builder(FeedClient client) { + this.client = requireNonNull(client); + } + + public Builder withTimeout(Duration timeout) { + parameters = parameters.timeout(timeout); + return this; + } + + public Builder withRoute(String route) { + parameters = parameters.route(route); + return this; + } + + public Builder withTracelevel(int tracelevel) { + parameters = parameters.tracelevel(tracelevel); + return this; + } + + public JsonFeeder build() { + return new JsonFeeder(client, parameters); + } + + } +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java deleted file mode 100644 index 17162f19d3f..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ /dev/null @@ -1,364 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static ai.vespa.feed.client.FeedClient.OperationType.put; -import static ai.vespa.feed.client.FeedClient.OperationType.remove; -import static ai.vespa.feed.client.FeedClient.OperationType.update; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - */ -public class JsonStreamFeeder implements Closeable { - - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds a stream containing a JSON array of feed operations on the form - * <pre> - * [ - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * }, - * { - * "put": "id:ns:type::foo", - * "fields": { ... document fields ... } - * }, - * { - * "update": "id:ns:type:n=4:bar", - * "create": true, - * "fields": { ... partial update fields ... } - * }, - * { - * "remove": "id:ns:type:g=foo:bar", - * "condition": "type.baz = \"bax\"" - * }, - * ... - * ] - * </pre> - * Note that {@code "id"} is an alias for the document put operation. - */ - public void feed(InputStream jsonStream) throws IOException { - feed(jsonStream, 1 << 26, false); - } - - BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feed(jsonStream, 1 << 26, true).get(); - } - - Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - buffer.expect(JsonToken.START_ARRAY); - AtomicInteger okCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); - long startTime = System.nanoTime(); - CompletableFuture<Result> result; - AtomicReference<Throwable> thrown = new AtomicReference<>(); - while ((result = buffer.next()) != null) { - result.whenComplete((r, t) -> { - if (t != null) { - failedCount.incrementAndGet(); - if (!benchmark) thrown.set(t); - } else - okCount.incrementAndGet(); - }); - if (thrown.get() != null) - sneakyThrow(thrown.get()); - } - if (!benchmark) return Optional.empty(); - Duration duration = Duration.ofNanos(System.nanoTime() - startTime); - double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; - return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); - } - - @SuppressWarnings("unchecked") - static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { client.close(); } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final byte[] data; - private final int size; - private final Object lock = new Object(); - private final JsonParser parser; - private Throwable thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - - RingBufferStream(InputStream in, int size) { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - try { this.parser = factory.createParser(this); } - catch (IOException e) { throw new UncheckedIOException(e); } - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw new RuntimeException("Error reading input", thrown); - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - public CompletableFuture<Result> next() throws IOException { - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - switch (parser.nextToken()) { - case END_ARRAY: return null; - case START_OBJECT: break; - default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = put; id = readId(); break; - case "update": type = update; id = readId(); break; - case "remove": type = remove; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - break; - - case END_OBJECT: - break loop; - - default: - throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - } - - if (id == null) - throw new IllegalArgumentException("No document id for document at offset " + start); - - if (end < start) - throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - - switch (type) { - case put: return client.put (id, payload, parameters); - case update: return client.update(id, payload, parameters); - case remove: return client.remove(id, parameters); - default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); - } - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - - } - - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonStreamFeeder build() { - return new JsonStreamFeeder(client, parameters); - } - - } - - static class BenchmarkResult { - final int okCount; - final int errorCount; - final Duration duration; - final double throughput; - - BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { - this.okCount = okCount; - this.errorCount = errorCount; - this.duration = duration; - this.throughput = throughput; - } - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java index 22546f89ccb..8c20a37d224 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java @@ -7,6 +7,8 @@ import java.util.Optional; import java.util.OptionalInt; /** + * Per-operation feed parameters + * * @author bjorncs * @author jonmv */ diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java new file mode 100644 index 00000000000..15ba024bb4e --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java @@ -0,0 +1,15 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * Signals that supplied JSON for a document/operation is invalid + * + * @author bjorncs + */ +public class OperationParseException extends FeedException { + + public OperationParseException(String message) { super(message); } + + public OperationParseException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java new file mode 100644 index 00000000000..d36475a51fb --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java @@ -0,0 +1,96 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Map; + +/** + * Statistics for feed operations over HTTP against a Vespa cluster. + * + * @author jonmv + */ +public class OperationStats { + + private final long requests; + private final Map<Integer, Long> responsesByCode; + private final long inflight; + private final long exceptions; + private final long averageLatencyMillis; + private final long minLatencyMillis; + private final long maxLatencyMillis; + private final long bytesSent; + private final long bytesReceived; + + public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, + long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, + long bytesSent, long bytesReceived) { + this.requests = requests; + this.responsesByCode = responsesByCode; + this.exceptions = exceptions; + this.inflight = inflight; + this.averageLatencyMillis = averageLatencyMillis; + this.minLatencyMillis = minLatencyMillis; + this.maxLatencyMillis = maxLatencyMillis; + this.bytesSent = bytesSent; + this.bytesReceived = bytesReceived; + } + + public long requests() { + return requests; + } + + public long responses() { + return requests - inflight; + } + + public long successes() { + return responsesByCode.getOrDefault(200, 0L); + } + + public Map<Integer, Long> responsesByCode() { + return responsesByCode; + } + + public long exceptions() { + return exceptions; + } + + public long inflight() { + return inflight; + } + + public long averageLatencyMillis() { + return averageLatencyMillis; + } + + public long minLatencyMillis() { + return minLatencyMillis; + } + + public long maxLatencyMillis() { + return maxLatencyMillis; + } + + public long bytesSent() { + return bytesSent; + } + + public long bytesReceived() { + return bytesReceived; + } + + @Override + public String toString() { + return "Stats{" + + "requests=" + requests + + ", responsesByCode=" + responsesByCode + + ", exceptions=" + exceptions + + ", inflight=" + inflight + + ", averageLatencyMillis=" + averageLatencyMillis + + ", minLatencyMillis=" + minLatencyMillis + + ", maxLatencyMillis=" + maxLatencyMillis + + ", bytesSent=" + bytesSent + + ", bytesReceived=" + bytesReceived + + '}'; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index 1787d8d65c6..a1101eb0ebb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,24 +1,30 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; -import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; /** * Controls execution of feed operations. * * @author jonmv */ -public interface RequestStrategy { +interface RequestStrategy { - /** Whether this has failed fatally, and we should cease sending further operations. */ - boolean hasFailed(); + /** Stats for operations sent through this. */ + OperationStats stats(); + + /** State of the circuit breaker. */ + State circuitBreakerState(); + + /** Forcibly terminates this, causing all inflight operations to complete immediately. */ + void destroy(); + + /** Wait for all inflight requests to complete. */ + void await(); /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ - CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request, - BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch); + CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java index 31a6cf6e893..b29d65e193b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java @@ -4,6 +4,8 @@ package ai.vespa.feed.client; import java.util.Optional; /** + * Result for a document operation + * * @author bjorncs * @author jonmv */ diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java new file mode 100644 index 00000000000..3fd5143e2f4 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java @@ -0,0 +1,14 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +/** + * Signals that the client was unable to parse the result/response from container + * + * @author bjorncs + */ +public class ResultParseException extends FeedException { + + public ResultParseException(DocumentId documentId, String message) { super(documentId, message); } + + public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); } +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java index 7200d5fd943..9114e22f4a6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java @@ -20,11 +20,14 @@ import java.nio.file.Path; import java.security.GeneralSecurityException; import java.security.KeyFactory; import java.security.KeyStore; +import java.security.KeyStoreException; import java.security.PrivateKey; import java.security.cert.Certificate; import java.security.cert.X509Certificate; import java.security.spec.PKCS8EncodedKeySpec; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; /** @@ -39,6 +42,9 @@ class SslContextBuilder { private Path certificateFile; private Path privateKeyFile; private Path caCertificatesFile; + private Collection<X509Certificate> certificate; + private PrivateKey privateKey; + private Collection<X509Certificate> caCertificates; SslContextBuilder withCertificateAndKey(Path certificate, Path privateKey) { this.certificateFile = certificate; @@ -46,20 +52,35 @@ class SslContextBuilder { return this; } + SslContextBuilder withCertificateAndKey(Collection<X509Certificate> certificate, PrivateKey privateKey) { + this.certificate = certificate; + this.privateKey = privateKey; + return this; + } + SslContextBuilder withCaCertificates(Path caCertificates) { this.caCertificatesFile = caCertificates; return this; } + SslContextBuilder withCaCertificates(Collection<X509Certificate> caCertificates) { + this.caCertificates = caCertificates; + return this; + } + SSLContext build() throws IOException { try { KeyStore keystore = KeyStore.getInstance("PKCS12"); keystore.load(null); if (certificateFile != null && privateKeyFile != null) { keystore.setKeyEntry("cert", privateKey(privateKeyFile), new char[0], certificates(certificateFile)); + } else if (certificate != null && privateKey != null) { + keystore.setKeyEntry("cert", privateKey, new char[0], certificate.toArray(new Certificate[0])); } if (caCertificatesFile != null) { - keystore.setCertificateEntry("ca-cert", certificates(caCertificatesFile)[0]); + addCaCertificates(keystore, Arrays.asList(certificates(caCertificatesFile))); + } else if (caCertificates != null) { + addCaCertificates(keystore, caCertificates); } KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); kmf.init(keystore, new char[0]); @@ -73,6 +94,13 @@ class SslContextBuilder { } } + private static void addCaCertificates(KeyStore keystore, Collection<? extends Certificate> certificates) throws KeyStoreException { + int i = 0; + for (Certificate cert : certificates) { + keystore.setCertificateEntry("ca-cert-" + ++i, cert); + } + } + private static Certificate[] certificates(Path file) throws IOException, GeneralSecurityException { try (PEMParser parser = new PEMParser(Files.newBufferedReader(file))) { List<X509Certificate> result = new ArrayList<>(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java new file mode 100644 index 00000000000..4e0c4fe90f0 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java @@ -0,0 +1,45 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import static java.lang.Math.max; +import static java.lang.Math.min; + +/** + * Reduces max throughput whenever throttled; increases it slowly whenever successful responses are obtained. + * + * @author jonmv + */ +public class StaticThrottler implements FeedClient.Throttler { + + protected final long maxInflight; + protected final long minInflight; + private final AtomicLong targetX10; + + public StaticThrottler(FeedClientBuilder builder) { + this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; + this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection); + this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. + } + + @Override + public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { } + + @Override + public void success() { + targetX10.incrementAndGet(); + } + + @Override + public void throttled(long inflight) { + targetX10.set(max(inflight * 5, minInflight * 10)); + } + + @Override + public long targetInflight() { + return min(maxInflight, targetX10.get() / 10); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java new file mode 100644 index 00000000000..e058b9b921e --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java @@ -0,0 +1,9 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * @author bjorncs + */ + +@PublicApi +package ai.vespa.feed.client; + +import com.yahoo.api.annotations.PublicApi;
\ No newline at end of file diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java new file mode 100644 index 00000000000..9b30ebfd0aa --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java @@ -0,0 +1,60 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.CircuitBreaker; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; + +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class GracePeriodCircuitBreakerTest { + + @Test + void testCircuitBreaker() { + AtomicLong now = new AtomicLong(0); + long SECOND = 1000; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + + assertEquals(CLOSED, breaker.state(), "Initial state is closed"); + + now.addAndGet(100 * SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); + + breaker.success(); + assertEquals(CLOSED, breaker.state(), "State is closed after a success"); + + now.addAndGet(100 * SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); + + breaker.failure(); + assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); + + now.addAndGet(SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); + + now.addAndGet(1); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); + + breaker.success(); + assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); + + breaker.failure(); + now.addAndGet(60 * SECOND); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); + + now.addAndGet(1); + assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); + + breaker.success(); + assertEquals(OPEN, breaker.state(), "State remains open in spite of new successes"); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java new file mode 100644 index 00000000000..d8090549420 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -0,0 +1,101 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * @author jonmv + */ +class HttpFeedClientTest { + + @Test + void testFeeding() throws ExecutionException, InterruptedException { + DocumentId id = DocumentId.of("ns", "type", "0"); + AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); + class MockRequestStrategy implements RequestStrategy { + @Override public OperationStats stats() { throw new UnsupportedOperationException(); } + @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; } + @Override public void destroy() { throw new UnsupportedOperationException(); } + @Override public void await() { throw new UnsupportedOperationException(); } + @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } + } + FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy()); + + // Vespa error is an error result. + dispatch.set((documentId, request) -> { + try { + assertEquals(id, documentId); + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); + + HttpResponse response = HttpResponse.of(502, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Ooops! ... I did it again.\",\n" + + " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + + "}").getBytes(UTF_8)); + return CompletableFuture.completedFuture(response); + } + catch (Throwable thrown) { + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); + failed.completeExceptionally(thrown); + return failed; + } + }); + Result result = client.put(id, + "json", + OperationParameters.empty() + .createIfNonExistent(true) + .testAndSetCondition("false") + .route("route") + .timeout(Duration.ofSeconds(5))) + .get(); + assertEquals("Ooops! ... I did it again.", result.resultMessage().get()); + assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get()); + + + // Handler error is a FeedException. + dispatch.set((documentId, request) -> { + try { + assertEquals(id, documentId); + assertEquals("/document/v1/ns/type/docid/0", + request.path()); + assertEquals("json", new String(request.body(), UTF_8)); + + HttpResponse response = HttpResponse.of(500, + ("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Alla ska i jorden.\",\n" + + " \"trace\": \"Din tid den kom, och senn så for den. \"\n" + + "}").getBytes(UTF_8)); + return CompletableFuture.completedFuture(response); + } + catch (Throwable thrown) { + CompletableFuture<HttpResponse> failed = new CompletableFuture<>(); + failed.completeExceptionally(thrown); + return failed; + } + }); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty()) + .get()); + assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java new file mode 100644 index 00000000000..21ab6889e6e --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -0,0 +1,203 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.CircuitBreaker; +import org.apache.hc.core5.http.ContentType; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class HttpRequestStrategyTest { + + @Test + void testConcurrency() { + int documents = 1 << 16; + HttpRequest request = new HttpRequest("PUT", "/", null, null); + HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS)); + + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), + cluster); + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + while ( ! latch.await(1, TimeUnit.SECONDS)) { + System.err.println(cluster.stats().inflight()); + System.err.println(strategy.throttler.targetInflight()); + System.err.println(); + } + } + catch (InterruptedException ignored) { } + }).start(); + long startNanos = System.nanoTime(); + for (int i = 0; i < documents; i++) + strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request); + + strategy.await(); + latch.countDown(); + executor.shutdown(); + cluster.close(); + OperationStats stats = cluster.stats(); + long successes = stats.responsesByCode().get(200); + System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + System.err.println(stats); + + assertEquals(documents, stats.requests()); + assertEquals(documents, stats.responses()); + assertEquals(documents, stats.responsesByCode().get(200)); + assertEquals(0, stats.inflight()); + assertEquals(0, stats.exceptions()); + assertEquals(0, stats.bytesSent()); + assertEquals(2 * documents, stats.bytesReceived()); + } + + @Test + void testLogic() throws ExecutionException, InterruptedException { + int minStreams = 16; // Hard limit for minimum number of streams per connection. + MockCluster cluster = new MockCluster(); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams), + new BenchmarkingCluster(cluster)); + + DocumentId id1 = DocumentId.of("ns", "type", "1"); + DocumentId id2 = DocumentId.of("ns", "type", "2"); + HttpRequest request = new HttpRequest("POST", "/", null, null); + + // Runtime exception is not retried. + cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom"))); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, request).get()); + assertEquals("boom", expected.getCause().getMessage()); + assertEquals(1, strategy.stats().requests()); + + // IOException is retried. + cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); + expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, request).get()); + assertEquals("retry me", expected.getCause().getCause().getMessage()); + assertEquals(3, strategy.stats().requests()); + + // Successful response is returned + HttpResponse success = HttpResponse.of(200, null); + cluster.expect((__, vessel) -> vessel.complete(success)); + assertEquals(success, strategy.enqueue(id1, request).get()); + assertEquals(4, strategy.stats().requests()); + + // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. + now.set(2000); + HttpResponse throttled = HttpResponse.of(429, null); + AtomicInteger count = new AtomicInteger(3); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>(); + cluster.expect((req, vessel) -> { + if (req == request) { + if (count.decrementAndGet() > 0) + vessel.complete(throttled); + else { + completion.set(vessel); + latch.countDown(); + } + } + else vessel.complete(success); + }); + CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get()); + latch.await(); + assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. + now.set(4000); + assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. + completion.get().complete(success); + assertEquals(success, delayed.get()); + assertEquals(success, serialised.get()); + + // Some error responses are retried. + HttpResponse serverError = HttpResponse.of(500, null); + cluster.expect((__, vessel) -> vessel.complete(serverError)); + assertEquals(serverError, strategy.enqueue(id1, request).get()); + assertEquals(11, strategy.stats().requests()); + assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. + + // Error responses are not retried when not of appropriate type. + cluster.expect((__, vessel) -> vessel.complete(serverError)); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get()); + assertEquals(12, strategy.stats().requests()); + + // Some error responses are not retried. + HttpResponse badRequest = HttpResponse.of(400, null); + cluster.expect((__, vessel) -> vessel.complete(badRequest)); + assertEquals(badRequest, strategy.enqueue(id1, request).get()); + assertEquals(13, strategy.stats().requests()); + + // Circuit breaker opens some time after starting to fail. + now.set(6000); + assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. + now.set(605000); + assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. + + Map<Integer, Long> codes = new HashMap<>(); + codes.put(200, 4L); + codes.put(400, 1L); + codes.put(429, 2L); + codes.put(500, 3L); + assertEquals(codes, strategy.stats().responsesByCode()); + assertEquals(3, strategy.stats().exceptions()); + } + + static class MockCluster implements Cluster { + + final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); + + void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { + dispatch.set(expected); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + dispatch.get().accept(request, vessel); + } + + @Override + public void close() { } + + @Override + public OperationStats stats() { + return null; + } + + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java new file mode 100644 index 00000000000..3e0f886a40a --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -0,0 +1,124 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class JsonFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 14; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + AtomicReference<FeedException> exceptionThrow = new AtomicReference<>(); + Path tmpFile = Files.createTempFile(null, null); + Files.write(tmpFile, json.getBytes(UTF_8)); + try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + long startNanos = System.nanoTime(); + SimpleClient feedClient = new SimpleClient(); + JsonFeeder.builder(feedClient).build() + .feedMany(in, 1 << 7, + new JsonFeeder.ResultCallback() { // TODO: hangs when buffer is smaller than largest document + @Override + public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); } + + @Override + public void onError(FeedException error) { exceptionThrow.set(error); } + + @Override + public void onComplete() { completedSuccessfully.set(true); } + }) + .join(); + + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, feedClient.ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); + } + } + + @Test + public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + try (JsonFeeder feeder = JsonFeeder.builder(new SimpleClient()).build()) { + String json = "{\"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n"; + Result result = feeder.feedSingle(json).get(); + assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); + assertEquals(Result.Type.success, result.type()); + assertEquals("success", result.resultMessage().get()); + } + } + + private static class SimpleClient implements FeedClient { + final Set<String> ids = new HashSet<>(); + + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return createSuccessResult(documentId); + } + + @Override + public OperationStats stats() { return null; } + + @Override + public void close(boolean graceful) { } + + private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); + } + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java deleted file mode 100644 index 8ef8ae57f5e..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class JsonStreamFeederTest { - - @Test - void test() throws IOException { - int docs = 1 << 10; - String json = "[\n" + - - IntStream.range(0, docs).mapToObj(i -> - " {\n" + - " \"id\": \"id:ns:type::abc" + i + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " },\n" - ).collect(Collectors.joining()) + - - " {\n" + - " \"id\": \"id:ns:type::abc" + docs + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " }\n" + - "]"; - ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set<String> ids = new ConcurrentSkipListSet<>(); - JsonStreamFeeder.builder(new FeedClient() { - @Override - public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { - ids.add(documentId.userSpecific()); - return new CompletableFuture<>(); - } - - @Override - public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return new CompletableFuture<>(); - } - - @Override - public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return new CompletableFuture<>(); - } - - @Override - public void close() throws IOException { - - } - }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6. - assertEquals(docs + 1, ids.size()); - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java new file mode 100644 index 00000000000..1e616f2625a --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -0,0 +1,92 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.Result; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Sample feeder demonstrating how to programmatically feed to a Vespa cluster. + */ +class JsonFileFeederExample implements Closeable { + + private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName()); + + private final JsonFeeder jsonFeeder; + private final URI endpoint; + + static class ResultCallBack implements JsonFeeder.ResultCallback { + + final AtomicInteger resultsReceived = new AtomicInteger(0); + final AtomicInteger errorsReceived = new AtomicInteger(0); + final long startTimeMillis = System.currentTimeMillis();; + + @Override + public void onNextResult(Result result, FeedException error) { + resultsReceived.incrementAndGet(); + if (error != null) { + log.warning("Problems with feeding document " + + error.documentId().map(DocumentId::toString).orElse("<unknown>")); + errorsReceived.incrementAndGet(); + } else if (result.type() == Result.Type.failure) { + log.warning("Problems with docID " + result.documentId() + ":" + error); + errorsReceived.incrementAndGet(); + } + } + + @Override + public void onError(FeedException error) { + log.severe("Feeding failed for d: " + error.getMessage()); + } + + @Override + public void onComplete() { + log.info("Feeding completed"); + } + + void dumpStatsToLog() { + log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors."); + log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms."); + } + + } + + JsonFileFeederExample(URI endpoint) { + this.endpoint = endpoint; + FeedClient feedClient = FeedClientBuilder.create(endpoint) + .build(); + this.jsonFeeder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofSeconds(30)) + .build(); + } + + /** + * Feed all operations from a stream. + * + * @param stream The input stream to read operations from (JSON array containing one or more document operations). + */ + void batchFeed(InputStream stream, String batchId) { + ResultCallBack callback = new ResultCallBack(); + log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'"); + CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback); + promise.join(); // wait for feeding to complete + callback.dumpStatsToLog(); + } + + @Override + public void close() throws IOException { + jsonFeeder.close(); + } +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java new file mode 100644 index 00000000000..5cee776b244 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -0,0 +1,117 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Simple Streaming feeder implementation which will send operations to a Vespa endpoint. + * Other threads communicate with the feeder by adding new operations on the BlockingQueue + */ + +class JsonStreamFeederExample extends Thread implements AutoCloseable { + + static class Operation { + final String type; + final String documentId; + final String documentFieldsJson; + + Operation(String type, String id, String fields) { + this.type = type; + this.documentId = id; + this.documentFieldsJson = fields; + } + } + + private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName()); + + private final BlockingQueue<Operation> operations; + private final FeedClient feedClient; + private final AtomicBoolean drain = new AtomicBoolean(false); + private final CountDownLatch finishedDraining = new CountDownLatch(1); + private final AtomicInteger resultCounter = new AtomicInteger(); + + /** + * Constructor + * @param operations The shared blocking queue where other threads can put document operations to. + * @param endpoint The endpoint to feed to + */ + JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) { + this.operations = operations; + this.feedClient = FeedClientBuilder.create(endpoint).build(); + } + + /** + * Shutdown this feeder, waits until operations on queue is drained + */ + @Override + public void close() { + log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size()); + drain.set(true); + try { + finishedDraining.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (!drain.get() || !operations.isEmpty()) { + try { + JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS); + if(op == null) // no operations available + continue; + log.info("Put document " + op.documentId); + CompletableFuture<Result> promise; + DocumentId docId = DocumentId.of(op.documentId); + OperationParameters params = OperationParameters.empty(); + String json = op.documentFieldsJson; + switch (op.type) { + case "put": + promise = feedClient.put(docId, json, params); + break; + case "remove": + promise = feedClient.remove(docId, params); + break; + case "update": + promise = feedClient.update(docId, json, params); + break; + default: + throw new IllegalArgumentException("Invalid operation: " + op.type); + } + promise.whenComplete((result, throwable) -> { + if (resultCounter.getAndIncrement() % 10 == 0) { + System.err.println(feedClient.stats()); + } + if (throwable != null) { + System.err.printf("Failure for '%s': %s", docId, throwable); + throwable.printStackTrace(); + } else if (result.type() == Result.Type.failure) { + System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("<no messsage>")); + } + }); + } catch (InterruptedException e) { + log.log(Level.SEVERE, "Got interrupt exception.", e); + break; + } + } + log.info("Shutting down feeding thread"); + this.feedClient.close(); + finishedDraining.countDown(); + } + +}
\ No newline at end of file diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java new file mode 100644 index 00000000000..5ece9051e41 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java @@ -0,0 +1,34 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +class SimpleExample { + + public static void main(String[] args) { + try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) { + DocumentId id = DocumentId.of("namespace", "documenttype", "1"); + String json = "{\"fields\": {\"title\": \"hello world\"}}"; + OperationParameters params = OperationParameters.empty() + .timeout(Duration.ofSeconds(5)) + .route("myvesparoute"); + CompletableFuture<Result> promise = client.put(id, json, params); + promise.whenComplete(((result, throwable) -> { + if (throwable != null) { + throwable.printStackTrace(); + } else { + System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage()); + } + })); + } + } + +} |