aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-06-25 16:06:30 +0200
committerGitHub <noreply@github.com>2021-06-25 16:06:30 +0200
commit59ecd78377b8dcd5390d1221b3f683fa65c96ed4 (patch)
tree6de4b935af3a6631f5f96c2a5b5c093053cf67c0 /vespa-feed-client
parent73537c51eb67ad450110371d9a456c20ac950f38 (diff)
parentf2001ec54146d5f325cbf248e000dbc8eacf41f6 (diff)
Merge pull request #18412 from vespa-engine/jonmv/vespa-feed-client
Move stuff around, change some doc, default benchmark on
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/abi-spec.json30
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java8
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java39
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java19
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java74
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java33
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java8
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java3
10 files changed, 125 insertions, 109 deletions
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json
index 5a473e05508..dfeef28cc96 100644
--- a/vespa-feed-client/abi-spec.json
+++ b/vespa-feed-client/abi-spec.json
@@ -79,11 +79,13 @@
"abstract"
],
"methods": [
- "public abstract void success()",
- "public abstract void failure()",
+ "public void success()",
+ "public void failure()",
"public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State state()"
],
- "fields": []
+ "fields": [
+ "public static final ai.vespa.feed.client.FeedClient$CircuitBreaker FUSED"
+ ]
},
"ai.vespa.feed.client.FeedClient$OperationType": {
"superClass": "java.lang.Enum",
@@ -117,22 +119,6 @@
],
"fields": []
},
- "ai.vespa.feed.client.FeedClient$Throttler": {
- "superClass": "java.lang.Object",
- "interfaces": [],
- "attributes": [
- "public",
- "interface",
- "abstract"
- ],
- "methods": [
- "public abstract void sent(long, java.util.concurrent.CompletableFuture)",
- "public abstract void success()",
- "public abstract void throttled(long)",
- "public abstract long targetInflight()"
- ],
- "fields": []
- },
"ai.vespa.feed.client.FeedClient": {
"superClass": "java.lang.Object",
"interfaces": [
@@ -148,7 +134,7 @@
"public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
"public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)",
"public abstract ai.vespa.feed.client.OperationStats stats()",
- "public ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()",
+ "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()",
"public abstract void close(boolean)",
"public void close()"
],
@@ -167,7 +153,7 @@
"public ai.vespa.feed.client.FeedClientBuilder setMaxStreamPerConnection(int)",
"public ai.vespa.feed.client.FeedClientBuilder setSslContext(javax.net.ssl.SSLContext)",
"public ai.vespa.feed.client.FeedClientBuilder setHostnameVerifier(javax.net.ssl.HostnameVerifier)",
- "public ai.vespa.feed.client.FeedClientBuilder setBenchmarkOn(boolean)",
+ "public ai.vespa.feed.client.FeedClientBuilder noBenchmarking()",
"public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.lang.String)",
"public ai.vespa.feed.client.FeedClientBuilder addRequestHeader(java.lang.String, java.util.function.Supplier)",
"public ai.vespa.feed.client.FeedClientBuilder setRetryStrategy(ai.vespa.feed.client.FeedClient$RetryStrategy)",
@@ -378,7 +364,7 @@
"ai.vespa.feed.client.StaticThrottler": {
"superClass": "java.lang.Object",
"interfaces": [
- "ai.vespa.feed.client.FeedClient$Throttler"
+ "ai.vespa.feed.client.Throttler"
],
"attributes": [
"public"
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
index b7244d40a72..bf407c60075 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
@@ -49,13 +49,6 @@ class ApacheCluster implements Cluster {
@Override
public void dispatch(HttpRequest wrapped, CompletableFuture<HttpResponse> vessel) {
- SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
- defaultHeaders.forEach(request::setHeader);
- request.setConfig(defaultConfig);
- wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
- if (wrapped.body() != null)
- request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
-
int index = 0;
int min = Integer.MAX_VALUE;
for (int i = 0; i < endpoints.size(); i++)
@@ -63,12 +56,19 @@ class ApacheCluster implements Cluster {
index = i;
min = endpoints.get(i).inflight.get();
}
-
Endpoint endpoint = endpoints.get(index);
- endpoint.inflight.incrementAndGet();
+
try {
+ SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
request.setScheme(endpoint.url.getScheme());
request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url)));
+ request.setConfig(defaultConfig);
+ defaultHeaders.forEach(request::setHeader);
+ wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
+ if (wrapped.body() != null)
+ request.setBody(wrapped.body(), ContentType.APPLICATION_JSON);
+
+ endpoint.inflight.incrementAndGet();
endpoint.client.execute(request,
new FutureCallback<SimpleHttpResponse>() {
@Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
index f428fb567e6..ed45f25b2c7 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -6,16 +6,18 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
/**
- * Allows dispatch to a Vespa cluster.
+ * Allows dispatch of HTTP requests to a remote Vespa cluster.
+ *
+ * @author jonmv
*/
interface Cluster extends Closeable {
- /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */
+ /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw! */
void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel);
@Override
default void close() { }
- default OperationStats stats() { return new OperationStats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); }
+ default OperationStats stats() { throw new UnsupportedOperationException("Benchmarking has been disabled"); }
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index f39b56ad50f..65ce8efe107 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -5,7 +5,14 @@ import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
/**
- * Asynchronous feed client accepting document operations as JSON
+ * Asynchronous feed client accepting document operations as JSON. The payload should be
+ * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre>
+ * {
+ * "fields": {
+ * ...
+ * }
+ * }
+ * </pre>
*
* @author bjorncs
* @author jonmv
@@ -33,7 +40,7 @@ public interface FeedClient extends Closeable {
OperationStats stats();
/** Current state of the circuit breaker. */
- default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; }
+ CircuitBreaker.State circuitBreakerState();
/** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
void close(boolean graceful);
@@ -47,19 +54,22 @@ public interface FeedClient extends Closeable {
/** Whether to retry operations of the given type. */
default boolean retry(OperationType type) { return true; }
- /** Number of retries per operation for non-backpressure problems. */
- default int retries() { return 32; }
+ /** Number of retries per operation for assumed transient, non-backpressure problems. */
+ default int retries() { return 10; }
}
/** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
interface CircuitBreaker {
+ /** A circuit breaker which is always closed. */
+ CircuitBreaker FUSED = () -> State.CLOSED;
+
/** Called by the client whenever a successful response is obtained. */
- void success();
+ default void success() { }
/** Called by the client whenever a transient or fatal error occurs. */
- void failure();
+ default void failure() { }
/** The current state of the circuit breaker. */
State state();
@@ -93,21 +103,4 @@ public interface FeedClient extends Closeable {
}
- /** Determines the number of requests to have inflight at any point. */
- interface Throttler {
-
- /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */
- void sent(long inflight, CompletableFuture<HttpResponse> vessel);
-
- /** A successful response was obtained. */
- void success();
-
- /** A throttle signal was obtained from the server. */
- void throttled(long inflight);
-
- /** The target inflight operations right now. */
- long targetInflight();
-
- }
-
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index 57aaf67c2d9..d85fd7bfa2b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -44,8 +44,8 @@ public class FeedClientBuilder {
Collection<X509Certificate> certificate;
PrivateKey privateKey;
Collection<X509Certificate> caCertificates;
- boolean benchmark;
- boolean dryrun;
+ boolean benchmark = true;
+ boolean dryrun = false;
/** Creates a builder for a single container endpoint **/
public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
@@ -66,8 +66,9 @@ public class FeedClientBuilder {
/**
* Sets the number of connections this client will use per endpoint.
*
- * A reasonable value here is a small multiple of the numbers of containers in the
- * cluster to feed, so load can be balanced across these.
+ * A reasonable value here is a value that lets all feed clients (if more than one)
+ * collectively have a number of connections which is a small multiple of the numbers
+ * of containers in the cluster to feed, so load can be balanced across these containers.
* In general, this value should be kept as low as possible, but poor connectivity
* between feeder and cluster may also warrant a higher number of connections.
*/
@@ -82,7 +83,9 @@ public class FeedClientBuilder {
*
* This determines the maximum number of concurrent, inflight requests for this client,
* which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over
- * more connections, when possible. The server's maximum is usually around 128-256.
+ * more connections, when possible.
+ * The feed client automatically throttles load to achieve the best throughput, and the
+ * actual number of streams per connection is usually lower than the maximum.
*/
public FeedClientBuilder setMaxStreamPerConnection(int max) {
if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max);
@@ -102,9 +105,9 @@ public class FeedClientBuilder {
return this;
}
- /** Turns on/off benchmarking, aggregated in {@link FeedClient#stats()}. */
- public FeedClientBuilder setBenchmarkOn(boolean on) {
- this.benchmark = on;
+ /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */
+ public FeedClientBuilder noBenchmarking() {
+ this.benchmark = false;
return this;
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 2480c9a7367..485e6f03908 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -33,7 +33,13 @@ import static java.util.logging.Level.WARNING;
* Controls request execution and retries.
*
* This class has all control flow for throttling and dispatching HTTP requests to an injected
- * HTTP cluster, including error handling and retries, and a circuit breaker mechanism.
+ * HTTP {@link Cluster}, including error handling and retries through a {@link RetryStrategy},
+ * a {@link CircuitBreaker} mechanism, and a {@link Throttler} for optimal load.
+ *
+ * Dispatch to the provided {@link Cluster} is done by a single dispatch thread. If dispatch ever throws,
+ * or the circuit breaker ever opens completely, the dispatch thread stops and all execution shuts down.
+ * This is done through {@link #destroy()}, which when called also ensures all enqueued operations are
+ * promptly completed, in addition to releasing any resources (threads, and in the provided cluster}.
*
* @author jonmv
*/
@@ -45,7 +51,7 @@ class HttpRequestStrategy implements RequestStrategy {
private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
- private final FeedClient.Throttler throttler;
+ private final Throttler throttler;
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final AtomicLong inflight = new AtomicLong(0);
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -100,38 +106,6 @@ class HttpRequestStrategy implements RequestStrategy {
queue.offer(() -> cluster.dispatch(request, vessel));
}
-
- /** A completable future which stores a temporary failure result to return upon abortion. */
- private static class RetriableFuture<T> extends CompletableFuture<T> {
-
- private final AtomicReference<Runnable> completion = new AtomicReference<>();
- private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>();
-
- private RetriableFuture() {
- completion.set(() -> completeExceptionally(new FeedException("Operation aborted")));
- }
-
- /** Complete now with the last result or error. */
- void complete() {
- completion.get().run();
- RetriableFuture<T> toComplete = dependency.getAndSet(null);
- if (toComplete != null) toComplete.complete();
- }
-
- /** Ensures the dependency is completed whenever this is. */
- void dependOn(RetriableFuture<T> dependency) {
- this.dependency.set(dependency);
- if (isDone()) dependency.complete();
- }
-
- /** Set the result of the last attempt at completing the computation represented by this. */
- void set(T result, Throwable thrown) {
- completion.set(thrown != null ? () -> completeExceptionally(thrown)
- : () -> complete(result));
- }
-
- }
-
private boolean poll() {
Runnable task = queue.poll();
if (task == null) return false;
@@ -140,7 +114,6 @@ class HttpRequestStrategy implements RequestStrategy {
return true;
}
-
private boolean isInExcess() {
return inflight.get() - delayedCount.get() > throttler.targetInflight();
}
@@ -232,6 +205,37 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
+
+ /** A completable future which stores a temporary failure result to return upon abortion. */
+ private static class RetriableFuture<T> extends CompletableFuture<T> {
+
+ private final AtomicReference<Runnable> completion = new AtomicReference<>();
+ private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>();
+
+ private RetriableFuture() {
+ completion.set(() -> completeExceptionally(new FeedException("Operation aborted")));
+ }
+
+ /** Complete now with the last result or error. */
+ void complete() {
+ completion.get().run();
+ RetriableFuture<T> toComplete = dependency.getAndSet(null);
+ if (toComplete != null) toComplete.complete();
+ }
+
+ /** Ensures the dependency is completed whenever this is. */
+ void dependOn(RetriableFuture<T> dependency) {
+ this.dependency.set(dependency);
+ if (isDone()) dependency.complete();
+ }
+
+ /** Set the result of the last attempt at completing the computation represented by this. */
+ void set(T result, Throwable thrown) {
+ completion.set(thrown != null ? () -> completeExceptionally(thrown)
+ : () -> complete(result));
+ }
+
+ }
@Override
public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries.
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
index 331bc213edf..c5b36746b6d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
@@ -12,7 +12,7 @@ import static java.lang.Math.min;
*
* @author jonmv
*/
-public class StaticThrottler implements FeedClient.Throttler {
+public class StaticThrottler implements Throttler {
protected final long maxInflight;
protected final long minInflight;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java
new file mode 100644
index 00000000000..fa1660d99e6
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Determines the number of requests to have inflight at any point.
+ *
+ * @author jonmv
+ */
+interface Throttler {
+
+ /**
+ * A request was just sent with {@code vessel}, with {@code inflight} total in flight.
+ */
+ void sent(long inflight, CompletableFuture<HttpResponse> vessel);
+
+ /**
+ * A successful response was obtained.
+ */
+ void success();
+
+ /**
+ * A throttle signal was obtained from the server.
+ */
+ void throttled(long inflight);
+
+ /**
+ * The target inflight operations right now.
+ */
+ long targetInflight();
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 02175150fed..beb231b61d4 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -266,14 +266,6 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
- @Override
- public void close() { }
-
- @Override
- public OperationStats stats() {
- return null;
- }
-
}
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index f2cd775bf3d..79dd68f241a 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -167,6 +167,9 @@ class JsonFeederTest {
public OperationStats stats() { return null; }
@Override
+ public CircuitBreaker.State circuitBreakerState() { return null; }
+
+ @Override
public void close(boolean graceful) { }
private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {