diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-25 15:42:53 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-25 15:42:53 +0200 |
commit | 360ece7a86f3dad99240b5d2da9428c15c2c175e (patch) | |
tree | 7c277a84960eb7e434c75fad4064299d3596ada2 /vespa-feed-client | |
parent | 11a6745c68f2ae199bedd7aa06a13ea52e3dc8c1 (diff) |
Move stuff around, change some doc, default benchmark on
Diffstat (limited to 'vespa-feed-client')
9 files changed, 91 insertions, 108 deletions
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json index 5a473e05508..aa93472a98f 100644 --- a/vespa-feed-client/abi-spec.json +++ b/vespa-feed-client/abi-spec.json @@ -79,11 +79,13 @@ "abstract" ], "methods": [ - "public abstract void success()", - "public abstract void failure()", + "public void success()", + "public void failure()", "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()" ], - "fields": [] + "fields": [ + "public static final ai.vespa.feed.client.FeedClient$CircuitBreaker FUSED" + ] }, "ai.vespa.feed.client.FeedClient$OperationType": { "superClass": "java.lang.Enum", @@ -117,22 +119,6 @@ ], "fields": [] }, - "ai.vespa.feed.client.FeedClient$Throttler": { - "superClass": "java.lang.Object", - "interfaces": [], - "attributes": [ - "public", - "interface", - "abstract" - ], - "methods": [ - "public abstract void sent(long, java.util.concurrent.CompletableFuture)", - "public abstract void success()", - "public abstract void throttled(long)", - "public abstract long targetInflight()" - ], - "fields": [] - }, "ai.vespa.feed.client.FeedClient": { "superClass": "java.lang.Object", "interfaces": [ @@ -148,7 +134,7 @@ "public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)", "public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)", "public abstract ai.vespa.feed.client.OperationStats stats()", - "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()", + "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()", "public abstract void close(boolean)", "public void close()" ], @@ -378,7 +364,7 @@ "ai.vespa.feed.client.StaticThrottler": { "superClass": "java.lang.Object", "interfaces": [ - "ai.vespa.feed.client.FeedClient$Throttler" + "ai.vespa.feed.client.Throttler" ], "attributes": [ "public" diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java index b7244d40a72..bf407c60075 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java @@ -49,13 +49,6 @@ class ApacheCluster implements Cluster { @Override public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) { - SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); - defaultHeaders.forEach(request::setHeader); - request.setConfig(defaultConfig); - wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); - if (wrapped.body() != null) - request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); - int index = 0; int min = Integer.MAX_VALUE; for (int i = 0; i < endpoints.size(); i++) @@ -63,12 +56,19 @@ class ApacheCluster implements Cluster { index = i; min = endpoints.get(i).inflight.get(); } - Endpoint endpoint = endpoints.get(index); - endpoint.inflight.incrementAndGet(); + try { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); request.setScheme(endpoint.url.getScheme()); request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url))); + request.setConfig(defaultConfig); + defaultHeaders.forEach(request::setHeader); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) + request.setBody(wrapped.body(), ContentType.APPLICATION_JSON); + + endpoint.inflight.incrementAndGet(); endpoint.client.execute(request, new FutureCallback<SimpleHttpResponse>() { @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index f428fb567e6..ed45f25b2c7 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -6,16 +6,18 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; /** - * Allows dispatch to a Vespa cluster. + * Allows dispatch of HTTP requests to a remote Vespa cluster. + * + * @author jonmv */ interface Cluster extends Closeable { - /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ + /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw! */ void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel); @Override default void close() { } - default OperationStats stats() { return new OperationStats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); } + default OperationStats stats() { throw new UnsupportedOperationException("Benchmarking has been disabled"); } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index f39b56ad50f..65ce8efe107 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -5,7 +5,14 @@ import java.io.Closeable; import java.util.concurrent.CompletableFuture; /** - * Asynchronous feed client accepting document operations as JSON + * Asynchronous feed client accepting document operations as JSON. The payload should be + * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre> + * { + * "fields": { + * ... + * } + * } + * </pre> * * @author bjorncs * @author jonmv @@ -33,7 +40,7 @@ public interface FeedClient extends Closeable { OperationStats stats(); /** Current state of the circuit breaker. */ - default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; } + CircuitBreaker.State circuitBreakerState(); /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ void close(boolean graceful); @@ -47,19 +54,22 @@ public interface FeedClient extends Closeable { /** Whether to retry operations of the given type. */ default boolean retry(OperationType type) { return true; } - /** Number of retries per operation for non-backpressure problems. */ - default int retries() { return 32; } + /** Number of retries per operation for assumed transient, non-backpressure problems. */ + default int retries() { return 10; } } /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ interface CircuitBreaker { + /** A circuit breaker which is always closed. */ + CircuitBreaker FUSED = () -> State.CLOSED; + /** Called by the client whenever a successful response is obtained. */ - void success(); + default void success() { } /** Called by the client whenever a transient or fatal error occurs. */ - void failure(); + default void failure() { } /** The current state of the circuit breaker. */ State state(); @@ -93,21 +103,4 @@ public interface FeedClient extends Closeable { } - /** Determines the number of requests to have inflight at any point. */ - interface Throttler { - - /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */ - void sent(long inflight, CompletableFuture<HttpResponse> vessel); - - /** A successful response was obtained. */ - void success(); - - /** A throttle signal was obtained from the server. */ - void throttled(long inflight); - - /** The target inflight operations right now. */ - long targetInflight(); - - } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index 57aaf67c2d9..d85fd7bfa2b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -44,8 +44,8 @@ public class FeedClientBuilder { Collection<X509Certificate> certificate; PrivateKey privateKey; Collection<X509Certificate> caCertificates; - boolean benchmark; - boolean dryrun; + boolean benchmark = true; + boolean dryrun = false; /** Creates a builder for a single container endpoint **/ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } @@ -66,8 +66,9 @@ public class FeedClientBuilder { /** * Sets the number of connections this client will use per endpoint. * - * A reasonable value here is a small multiple of the numbers of containers in the - * cluster to feed, so load can be balanced across these. + * A reasonable value here is a value that lets all feed clients (if more than one) + * collectively have a number of connections which is a small multiple of the numbers + * of containers in the cluster to feed, so load can be balanced across these containers. * In general, this value should be kept as low as possible, but poor connectivity * between feeder and cluster may also warrant a higher number of connections. */ @@ -82,7 +83,9 @@ public class FeedClientBuilder { * * This determines the maximum number of concurrent, inflight requests for this client, * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over - * more connections, when possible. The server's maximum is usually around 128-256. + * more connections, when possible. + * The feed client automatically throttles load to achieve the best throughput, and the + * actual number of streams per connection is usually lower than the maximum. */ public FeedClientBuilder setMaxStreamPerConnection(int max) { if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max); @@ -102,9 +105,9 @@ public class FeedClientBuilder { return this; } - /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */ - public FeedClientBuilder setBenchmarkOn(boolean on) { - this.benchmark = on; + /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */ + public FeedClientBuilder noBenchmarking() { + this.benchmark = false; return this; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 2480c9a7367..485e6f03908 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -33,7 +33,13 @@ import static java.util.logging.Level.WARNING; * Controls request execution and retries. * * This class has all control flow for throttling and dispatching HTTP requests to an injected - * HTTP cluster, including error handling and retries, and a circuit breaker mechanism. + * HTTP {@link Cluster}, including error handling and retries through a {@link RetryStrategy}, + * a {@link CircuitBreaker} mechanism, and a {@link Throttler} for optimal load. + * + * Dispatch to the provided {@link Cluster} is done by a single dispatch thread. If dispatch ever throws, + * or the circuit breaker ever opens completely, the dispatch thread stops and all execution shuts down. + * This is done through {@link #destroy()}, which when called also ensures all enqueued operations are + * promptly completed, in addition to releasing any resources (threads, and in the provided cluster}. * * @author jonmv */ @@ -45,7 +51,7 @@ class HttpRequestStrategy implements RequestStrategy { private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; - private final FeedClient.Throttler throttler; + private final Throttler throttler; private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); private final AtomicLong inflight = new AtomicLong(0); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -100,38 +106,6 @@ class HttpRequestStrategy implements RequestStrategy { queue.offer(() -> cluster.dispatch(request, vessel)); } - - /** A completable future which stores a temporary failure result to return upon abortion. */ - private static class RetriableFuture<T> extends CompletableFuture<T> { - - private final AtomicReference<Runnable> completion = new AtomicReference<>(); - private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>(); - - private RetriableFuture() { - completion.set(() -> completeExceptionally(new FeedException("Operation aborted"))); - } - - /** Complete now with the last result or error. */ - void complete() { - completion.get().run(); - RetriableFuture<T> toComplete = dependency.getAndSet(null); - if (toComplete != null) toComplete.complete(); - } - - /** Ensures the dependency is completed whenever this is. */ - void dependOn(RetriableFuture<T> dependency) { - this.dependency.set(dependency); - if (isDone()) dependency.complete(); - } - - /** Set the result of the last attempt at completing the computation represented by this. */ - void set(T result, Throwable thrown) { - completion.set(thrown != null ? () -> completeExceptionally(thrown) - : () -> complete(result)); - } - - } - private boolean poll() { Runnable task = queue.poll(); if (task == null) return false; @@ -140,7 +114,6 @@ class HttpRequestStrategy implements RequestStrategy { return true; } - private boolean isInExcess() { return inflight.get() - delayedCount.get() > throttler.targetInflight(); } @@ -232,6 +205,37 @@ class HttpRequestStrategy implements RequestStrategy { } } + + /** A completable future which stores a temporary failure result to return upon abortion. */ + private static class RetriableFuture<T> extends CompletableFuture<T> { + + private final AtomicReference<Runnable> completion = new AtomicReference<>(); + private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>(); + + private RetriableFuture() { + completion.set(() -> completeExceptionally(new FeedException("Operation aborted"))); + } + + /** Complete now with the last result or error. */ + void complete() { + completion.get().run(); + RetriableFuture<T> toComplete = dependency.getAndSet(null); + if (toComplete != null) toComplete.complete(); + } + + /** Ensures the dependency is completed whenever this is. */ + void dependOn(RetriableFuture<T> dependency) { + this.dependency.set(dependency); + if (isDone()) dependency.complete(); + } + + /** Set the result of the last attempt at completing the computation represented by this. */ + void set(T result, Throwable thrown) { + completion.set(thrown != null ? () -> completeExceptionally(thrown) + : () -> complete(result)); + } + + } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries. diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java index 331bc213edf..c5b36746b6d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java @@ -12,7 +12,7 @@ import static java.lang.Math.min; * * @author jonmv */ -public class StaticThrottler implements FeedClient.Throttler { +public class StaticThrottler implements Throttler { protected final long maxInflight; protected final long minInflight; diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 02175150fed..beb231b61d4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -266,14 +266,6 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } - @Override - public void close() { } - - @Override - public OperationStats stats() { - return null; - } - } } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index 6c26e1d8ae8..6ad927e855d 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -158,6 +158,9 @@ class JsonFeederTest { public OperationStats stats() { return null; } @Override + public CircuitBreaker.State circuitBreakerState() { return null; } + + @Override public void close(boolean graceful) { } private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { |