aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-01 12:37:49 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-01 12:37:49 +0200
commit15d754c4e19ed171d3989904095fd9554849eb43 (patch)
tree827c5e97fff4a938ade4c21fd5c208421ea2886d /vespa-feed-client
parent9d19abe4f78e8139517588aba4ee827d5e0e8227 (diff)
Higher level retries
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java63
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java215
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java10
3 files changed, 174 insertions, 114 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 2f379bd0778..370cd326f10 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -56,9 +56,9 @@ class HttpFeedClient implements FeedClient {
this.requestStrategy = new HttpRequestStrategy(builder);
for (int i = 0; i < builder.maxConnections; i++) {
- CloseableHttpAsyncClient hc = createHttpClient(builder, requestStrategy);
- hc.start();
- httpClients.add(hc);
+ CloseableHttpAsyncClient client = createHttpClient(builder, requestStrategy);
+ client.start();
+ httpClients.add(client);
inflight.add(new AtomicInteger());
}
}
@@ -69,7 +69,7 @@ class HttpFeedClient implements FeedClient {
.setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
.disableCookieManagement()
.disableRedirectHandling()
- .setRetryStrategy(retryStrategy)
+ .disableAutomaticRetries()
.setIOReactorConfig(IOReactorConfig.custom()
.setSoTimeout(Timeout.ofSeconds(10))
.build())
@@ -140,6 +140,23 @@ class HttpFeedClient implements FeedClient {
if (operationJson != null)
request.setBody(operationJson, ContentType.APPLICATION_JSON);
+ return requestStrategy.enqueue(documentId, request, this::send)
+ .handle((response, thrown) -> {
+ if (thrown != null) {
+ if (requestStrategy.hasFailed()) {
+ try { close(); }
+ catch (IOException exception) { thrown.addSuppressed(exception); }
+ }
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+ thrown.printStackTrace(new PrintStream(buffer));
+ return new Result(Result.Type.failure, documentId, buffer.toString(), null);
+ }
+ return toResult(response, documentId);
+ });
+ }
+
+ /** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */
+ private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
int index = 0;
int min = Integer.MAX_VALUE;
for (int i = 0; i < httpClients.size(); i++)
@@ -148,29 +165,19 @@ class HttpFeedClient implements FeedClient {
index = i;
}
- CloseableHttpAsyncClient client = httpClients.get(index);
- AtomicInteger counter = inflight.get(index);
- counter.incrementAndGet();
- return requestStrategy.enqueue(documentId, future -> {
- client.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { future.complete(response); }
- @Override public void failed(Exception ex) { future.completeExceptionally(ex); }
- @Override public void cancelled() { future.cancel(false); }
- });
- }).handle((response, thrown) -> {
- counter.decrementAndGet();
- if (thrown != null) {
- if (requestStrategy.hasFailed()) {
- try { close(); }
- catch (IOException exception) { thrown.addSuppressed(exception); }
- }
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- thrown.printStackTrace(new PrintStream(buffer));
- return new Result(Result.Type.failure, documentId, buffer.toString(), null);
- }
- return toResult(response, documentId);
- });
+ inflight.get(index).incrementAndGet();
+ try {
+ httpClients.get(index).execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.thenRun(inflight.get(index)::decrementAndGet);
}
static Result toResult(SimpleHttpResponse response, DocumentId documentId) {
@@ -180,7 +187,7 @@ class HttpFeedClient implements FeedClient {
case 412: type = Result.Type.conditionNotMet; break;
default: type = Result.Type.failure;
}
- Map<String, String> responseJson = null; // TODO: parse JSON.
+ Map<String, String> responseJson = null; // TODO: parse JSON on error.
return new Result(type, documentId, response.getBodyText(), "trace");
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 0512d6a64c9..23569af3cdc 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -1,69 +1,72 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import org.apache.hc.client5.http.HttpRequestRetryStrategy;
+import ai.vespa.feed.client.FeedClient.RetryStrategy;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
-import org.apache.hc.core5.http.HttpRequest;
-import org.apache.hc.core5.http.HttpResponse;
-import org.apache.hc.core5.http.protocol.HttpContext;
-import org.apache.hc.core5.util.TimeValue;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.logging.Logger;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.logging.Level.INFO;
/**
* Controls request execution and retries:
* <ul>
* <li>Retry all IO exceptions; however</li>
* <li>abort everything if more than 10% of requests result in an exception for some time.</li>
- * <li>Whenever throttled, limit inflight to 99% of current; and</li>
+ * <li>Whenever throttled, limit inflight to one less than the current; and</li>
* <li>on every successful response, increase inflight limit by 0.1.</li>
* </ul>
*
* @author jonmv
*/
-class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRequestRetryStrategy {
+class HttpRequestStrategy implements RequestStrategy {
+
+ private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
+ private static final double errorThreshold = 0.1;
- private final Map<DocumentId, CompletableFuture<SimpleHttpResponse>> byId = new ConcurrentHashMap<>();
- private final FeedClient.RetryStrategy wrapped;
+ private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
+ private final Object lock = new Object();
+ private final RetryStrategy wrapped;
private final long maxInflight;
+ private final long minInflight;
private double targetInflight;
- private long inflight;
- private final AtomicReference<Double> errorRate;
- private final double errorThreshold;
- private final Lock lock;
- private final Condition available;
+ private long inflight = 0;
+ private double errorRate = 0;
+ private long consecutiveSuccesses = 0;
+ private boolean failed = false;
HttpRequestStrategy(FeedClientBuilder builder) {
this.wrapped = builder.retryStrategy;
this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
- this.targetInflight = maxInflight;
- this.inflight = 0;
- this.errorRate = new AtomicReference<>(0.0);
- this.errorThreshold = 0.1;
- this.lock = new ReentrantLock(true);
- this.available = lock.newCondition();
+ this.minInflight = builder.maxConnections * (long) Math.min(16, builder.maxStreamsPerConnection);
+ this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
}
- private double cycle() {
- return targetInflight; // TODO: tune this--could start way too high if limit is set too high.
- }
+ /**
+ * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
+ * or the user has turned off retries for this type of operation.
+ */
+ private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
+ failure();
+ log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request +
+ ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error");
- @Override
- public boolean retryRequest(HttpRequest request, IOException exception, int execCount, HttpContext context) {
- if (errorRate.updateAndGet(rate -> rate + (1 - rate) / cycle()) > errorThreshold)
+ if ( ! (thrown instanceof IOException))
return false;
- if (execCount > wrapped.retries())
+ if (attempt > wrapped.retries())
return false;
+
switch (request.getMethod().toUpperCase()) {
case "POST": return wrapped.retry(FeedClient.OperationType.put);
case "PUT": return wrapped.retry(FeedClient.OperationType.update);
@@ -73,83 +76,129 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe
}
/**
- * Called when a response is successfully obtained.
+ * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the
+ * error rate converge towards the true error rate, at a speed inversely proportional to the target number
+ * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width.
*/
void success() {
- errorRate.updateAndGet(rate -> rate - rate / cycle());
- lock.lock();
- targetInflight = Math.min(targetInflight + 0.1, maxInflight);
- lock.unlock();
+ synchronized (lock) {
+ errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates.
+ targetInflight = min(targetInflight + 0.1, maxInflight);
+ ++consecutiveSuccesses;
+ }
}
- @Override
- public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) {
- if (response.getCode() == 429 || response.getCode() == 503) {
- lock.lock();
- targetInflight = Math.max(100, 99 * inflight / 100);
- lock.unlock();
- return true;
+ /**
+ * Called whenever a failure to get a successful response is recorded.
+ */
+ void failure() {
+ synchronized (lock) {
+ errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates.
+ if (errorRate > errorThreshold)
+ failed = true;
+
+ consecutiveSuccesses = 0;
}
- return false;
}
- @Override
- public TimeValue getRetryInterval(HttpResponse response, int execCount, HttpContext context) {
- return TimeValue.ofMilliseconds(100);
+ /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
+ private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
+ if (response.getCode() / 100 == 2) {
+ success();
+ return false;
+ }
+
+ log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
+ " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+
+ if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
+ synchronized (lock) {
+ targetInflight = max(inflight * 0.9, minInflight);
+ }
+ return true;
+ }
+
+ failure();
+ return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors.
}
- void acquireSlot() {
- lock.lock();
+ // Must hold lock.
+ private void acquireSlot() {
try {
while (inflight >= targetInflight)
- available.awaitUninterruptibly();
+ lock.wait();
++inflight;
}
- finally {
- lock.unlock();
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
- void releaseSlot() {
- lock.lock();
- try {
- --inflight;
-
- if (inflight < targetInflight)
- available.signal();
- }
- finally {
- lock.unlock();
- }
+ // Must hold lock.
+ private void releaseSlot() {
+ for (long i = --inflight; i < targetInflight; i++)
+ lock.notify();
}
@Override
public boolean hasFailed() {
- return errorRate.get() > errorThreshold;
+ synchronized (lock) {
+ return failed;
+ }
}
@Override
- public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) {
- acquireSlot();
-
- Consumer<CompletableFuture<SimpleHttpResponse>> safeDispatch = vessel -> {
- try { dispatch.accept(vessel); }
- catch (Throwable t) { vessel.completeExceptionally(t); }
- };
- CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>();
- byId.compute(documentId, (id, previous) -> {
- if (previous == null) safeDispatch.accept(vessel);
- else previous.whenComplete((__, ___) -> safeDispatch.accept(vessel));
- return vessel;
- });
+ public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
+ BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) {
+ CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
+ CompletableFuture<Void> blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete.
+
+ // Get the previous inflight operation for this doc-id, or acquire a send slot.
+ CompletableFuture<Void> previous;
+ synchronized (lock) {
+ previous = inflightById.put(documentId, blocker);
+ if (previous == null)
+ acquireSlot();
+ }
+ if (previous == null) // Send immediately if none inflight ...
+ dispatch.accept(request, vessel);
+ else // ... or send when the previous inflight is done.
+ previous.thenRun(() -> dispatch.accept(request, vessel));
- return vessel.whenComplete((__, thrown) -> {
- releaseSlot();
- if (thrown == null)
- success();
+ handleAttempt(vessel, dispatch, blocker, request, result, documentId, 1);
+ return result;
+ }
- byId.compute(documentId, (id, current) -> current == vessel ? null : current);
+ /** Handles the result of one attempt at the given operation, retrying if necessary. */
+ private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch,
+ CompletableFuture<Void> blocker, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result,
+ DocumentId documentId, int attempt) {
+ vessel.whenComplete((response, thrown) -> {
+ // Retry the operation if it failed with a transient error ...
+ if ( ! failed && (thrown != null ? retry(request, thrown, attempt)
+ : retry(request, response, attempt))) {
+ CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
+ dispatch.accept(request, retry);
+ handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1);
+ return;
+ }
+
+ // ... or accept the outcome and mark the operation as complete.
+ CompletableFuture<Void> current;
+ synchronized (lock) {
+ current = inflightById.get(documentId);
+ if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ...
+ releaseSlot();
+ inflightById.put(documentId, null);
+ }
+ }
+ if (current != blocker) // ... or trigger sending the next enqueued operation.
+ blocker.complete(null);
+
+ if (thrown == null) result.complete(response);
+ else result.completeExceptionally(thrown);
});
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index e5eb956114e..466c4f9a0ab 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -1,20 +1,24 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
/**
* Controls execution of feed operations.
*
* @author jonmv
*/
-public interface RequestStrategy<T> {
+public interface RequestStrategy {
/** Whether this has failed, and we should stop. */
boolean hasFailed();
/** Enqueue the given operation, which is dispatched to a vessel future when ready. */
- CompletableFuture<T> enqueue(DocumentId documentId, Consumer<CompletableFuture<T>> dispatch);
+ CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
+ BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch);
}