aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java97
1 files changed, 61 insertions, 36 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index ddfc509738f..2480c9a7367 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -24,18 +25,15 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
-import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
-// TODO: update doc
/**
- * Controls request execution and retries:
- * <ul>
- * <li>Whenever throttled (429, 503), set target inflight to 0.9 * current, and retry over a different connection;</li>
- * <li>retry other transient errors (500, 502 and IOException) a specified number of times, for specified operation types;</li>
- * <li>and on every successful response, increase target inflight by 0.1.</li>
- * </ul>
+ * Controls request execution and retries.
+ *
+ * This class has all control flow for throttling and dispatching HTTP requests to an injected
+ * HTTP cluster, including error handling and retries, and a circuit breaker mechanism.
*
* @author jonmv
*/
@@ -44,10 +42,10 @@ class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Cluster cluster;
- private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>();
+ private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
- final FeedClient.Throttler throttler;
+ private final FeedClient.Throttler throttler;
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final AtomicLong inflight = new AtomicLong(0);
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -88,21 +86,50 @@ class HttpRequestStrategy implements RequestStrategy {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
- Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open?
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
}
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.log(WARNING, "Dispatch thread interrupted; shutting down");
+ catch (Throwable t) {
+ log.log(WARNING, "Dispatch thread threw; shutting down", t);
}
destroy();
}
private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
delayedCount.incrementAndGet();
- queue.offer(() -> {
- cluster.dispatch(request, vessel);
- });
+ queue.offer(() -> cluster.dispatch(request, vessel));
+ }
+
+
+ /** A completable future which stores a temporary failure result to return upon abortion. */
+ private static class RetriableFuture<T> extends CompletableFuture<T> {
+
+ private final AtomicReference<Runnable> completion = new AtomicReference<>();
+ private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>();
+
+ private RetriableFuture() {
+ completion.set(() -> completeExceptionally(new FeedException("Operation aborted")));
+ }
+
+ /** Complete now with the last result or error. */
+ void complete() {
+ completion.get().run();
+ RetriableFuture<T> toComplete = dependency.getAndSet(null);
+ if (toComplete != null) toComplete.complete();
+ }
+
+ /** Ensures the dependency is completed whenever this is. */
+ void dependOn(RetriableFuture<T> dependency) {
+ this.dependency.set(dependency);
+ if (isDone()) dependency.complete();
+ }
+
+ /** Set the result of the last attempt at completing the computation represented by this. */
+ void set(T result, Throwable thrown) {
+ completion.set(thrown != null ? () -> completeExceptionally(thrown)
+ : () -> complete(result));
+ }
+
}
private boolean poll() {
@@ -139,11 +166,11 @@ class HttpRequestStrategy implements RequestStrategy {
if ( (thrown instanceof IOException) // General IO problems.
|| (thrown instanceof CancellationException) // TLS session disconnect.
|| (thrown instanceof CancelledKeyException)) { // Selection cancelled.
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request);
+ log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request);
return retry(request, attempt);
}
- log.log(WARNING, thrown, () -> "Failed attempt " + attempt + " at " + request);
+ log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
return false;
}
@@ -158,18 +185,17 @@ class HttpRequestStrategy implements RequestStrategy {
if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
- logResponse(FINE, response, request, attempt);
+ logResponse(FINER, response, request, attempt);
throttler.throttled((inflight.get() - delayedCount.get()));
return true;
}
breaker.failure();
+ logResponse(FINE, response, request, attempt);
if (response.code() == 500 || response.code() == 502 || response.code() == 504) { // Hopefully temporary errors.
- logResponse(INFO, response, request, attempt);
return retry(request, attempt);
}
- logResponse(WARNING, response, request, attempt);
return false;
}
@@ -208,11 +234,11 @@ class HttpRequestStrategy implements RequestStrategy {
@Override
public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
- CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries.
CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
- CompletableFuture<?> previous = inflightById.put(documentId, result);
+ RetriableFuture<HttpResponse> previous = inflightById.put(documentId, result);
if (destroyed.get()) {
- result.cancel(true);
+ result.complete();
return result;
}
@@ -221,13 +247,15 @@ class HttpRequestStrategy implements RequestStrategy {
offer(request, vessel);
throttler.sent(inflight.get(), result);
}
- else
+ else {
+ result.dependOn(previous); // In case result is aborted, also abort the previous if still inflight.
previous.whenComplete((__, ___) -> offer(request, vessel));
+ }
handleAttempt(vessel, request, result, 1);
return result.handle((response, error) -> {
- if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null)
+ if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null)
releaseSlot();
if (error != null) {
@@ -239,29 +267,26 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
+ result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
- CircuitBreaker.State state = breaker.state();
CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(request, retry);
- handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
+ handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1));
}
// ... or accept the outcome and mark the operation as complete.
- else {
- if (thrown == null) result.complete(response);
- else result.completeExceptionally(thrown);
- }
+ else result.complete();
},
resultExecutor);
}
@Override
public void destroy() {
- if ( ! destroyed.getAndSet(true)) {
- inflightById.values().forEach(result -> result.cancel(true)); // TODO: More informative exception.
+ if (destroyed.compareAndSet(false, true)) {
+ inflightById.values().forEach(RetriableFuture::complete);
cluster.close();
resultExecutor.shutdown();
}