diff options
Diffstat (limited to 'vespa-feed-client/src/main')
4 files changed, 86 insertions, 49 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java index 2c5c2dccf19..c319bfca252 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java @@ -8,7 +8,7 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.INFO; +import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; /** @@ -48,7 +48,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { public void success() { failingSinceMillis.set(NEVER); if ( ! open.get() && halfOpen.compareAndSet(true, false)) - log.log(INFO, "Circuit breaker is now closed"); + log.log(FINE, "Circuit breaker is now closed"); } @Override @@ -60,7 +60,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { public State state() { long failingMillis = clock.getAsLong() - failingSinceMillis.get(); if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) - log.log(INFO, "Circuit breaker is now half-open"); + log.log(FINE, "Circuit breaker is now half-open"); if (failingMillis > doomMillis && open.compareAndSet(false, true)) log.log(WARNING, "Circuit breaker is now open"); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 2269c56cde4..90b5707c8a0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -85,15 +85,25 @@ class HttpFeedClient implements FeedClient { .thenApply(response -> toResult(request, response, documentId)); } + private enum Outcome { success, conditionNotMet, vespaFailure, transportFailure }; + + static Result.Type toResultType(Outcome outcome) { + switch (outcome) { + case success: return Result.Type.success; + case conditionNotMet: return Result.Type.conditionNotMet; + default: throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'"); + } + } + static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) { - Result.Type type; + Outcome outcome; switch (response.code()) { - case 200: type = Result.Type.success; break; - case 412: type = Result.Type.conditionNotMet; break; + case 200: outcome = Outcome.success; break; + case 412: outcome = Outcome.conditionNotMet; break; case 502: case 504: - case 507: type = Result.Type.failure; break; - default: type = null; + case 507: outcome = Outcome.vespaFailure; break; + default: outcome = Outcome.transportFailure; } String message = null; @@ -125,13 +135,16 @@ class HttpFeedClient implements FeedClient { throw new ResultParseException(documentId, e); } - if (type == null) // Not a Vespa response, but a failure in the HTTP layer. - throw new ResultParseException( + if (outcome == Outcome.transportFailure) // Not a Vespa response, but a failure in the HTTP layer. + throw new FeedException( documentId, "Status " + response.code() + " executing '" + request + "': " + (message == null ? new String(response.body(), UTF_8) : message)); - return new Result(type, documentId, message, trace); + if (outcome == Outcome.vespaFailure) + throw new ResultException(documentId, message, trace); + + return new Result(toResultType(outcome), documentId, message, trace); } static String getPath(DocumentId documentId) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index ddfc509738f..2480c9a7367 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -24,18 +25,15 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; -import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -// TODO: update doc /** - * Controls request execution and retries: - * <ul> - * <li>Whenever throttled (429, 503), set target inflight to 0.9 * current, and retry over a different connection;</li> - * <li>retry other transient errors (500, 502 and IOException) a specified number of times, for specified operation types;</li> - * <li>and on every successful response, increase target inflight by 0.1.</li> - * </ul> + * Controls request execution and retries. + * + * This class has all control flow for throttling and dispatching HTTP requests to an injected + * HTTP cluster, including error handling and retries, and a circuit breaker mechanism. * * @author jonmv */ @@ -44,10 +42,10 @@ class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Cluster cluster; - private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>(); + private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; - final FeedClient.Throttler throttler; + private final FeedClient.Throttler throttler; private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); private final AtomicLong inflight = new AtomicLong(0); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -88,21 +86,50 @@ class HttpRequestStrategy implements RequestStrategy { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.log(WARNING, "Dispatch thread interrupted; shutting down"); + catch (Throwable t) { + log.log(WARNING, "Dispatch thread threw; shutting down", t); } destroy(); } private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) { delayedCount.incrementAndGet(); - queue.offer(() -> { - cluster.dispatch(request, vessel); - }); + queue.offer(() -> cluster.dispatch(request, vessel)); + } + + + /** A completable future which stores a temporary failure result to return upon abortion. */ + private static class RetriableFuture<T> extends CompletableFuture<T> { + + private final AtomicReference<Runnable> completion = new AtomicReference<>(); + private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>(); + + private RetriableFuture() { + completion.set(() -> completeExceptionally(new FeedException("Operation aborted"))); + } + + /** Complete now with the last result or error. */ + void complete() { + completion.get().run(); + RetriableFuture<T> toComplete = dependency.getAndSet(null); + if (toComplete != null) toComplete.complete(); + } + + /** Ensures the dependency is completed whenever this is. */ + void dependOn(RetriableFuture<T> dependency) { + this.dependency.set(dependency); + if (isDone()) dependency.complete(); + } + + /** Set the result of the last attempt at completing the computation represented by this. */ + void set(T result, Throwable thrown) { + completion.set(thrown != null ? () -> completeExceptionally(thrown) + : () -> complete(result)); + } + } private boolean poll() { @@ -139,11 +166,11 @@ class HttpRequestStrategy implements RequestStrategy { if ( (thrown instanceof IOException) // General IO problems. || (thrown instanceof CancellationException) // TLS session disconnect. || (thrown instanceof CancelledKeyException)) { // Selection cancelled. - log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request); + log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); return retry(request, attempt); } - log.log(WARNING, thrown, () -> "Failed attempt " + attempt + " at " + request); + log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); return false; } @@ -158,18 +185,17 @@ class HttpRequestStrategy implements RequestStrategy { if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. - logResponse(FINE, response, request, attempt); + logResponse(FINER, response, request, attempt); throttler.throttled((inflight.get() - delayedCount.get())); return true; } breaker.failure(); + logResponse(FINE, response, request, attempt); if (response.code() == 500 || response.code() == 502 || response.code() == 504) { // Hopefully temporary errors. - logResponse(INFO, response, request, attempt); return retry(request, attempt); } - logResponse(WARNING, response, request, attempt); return false; } @@ -208,11 +234,11 @@ class HttpRequestStrategy implements RequestStrategy { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { - CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries. CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. - CompletableFuture<?> previous = inflightById.put(documentId, result); + RetriableFuture<HttpResponse> previous = inflightById.put(documentId, result); if (destroyed.get()) { - result.cancel(true); + result.complete(); return result; } @@ -221,13 +247,15 @@ class HttpRequestStrategy implements RequestStrategy { offer(request, vessel); throttler.sent(inflight.get(), result); } - else + else { + result.dependOn(previous); // In case result is aborted, also abort the previous if still inflight. previous.whenComplete((__, ___) -> offer(request, vessel)); + } handleAttempt(vessel, request, result, 1); return result.handle((response, error) -> { - if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) + if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) releaseSlot(); if (error != null) { @@ -239,29 +267,26 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { + result.set(response, thrown); // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { - CircuitBreaker.State state = breaker.state(); CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(request, retry); - handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1)); } // ... or accept the outcome and mark the operation as complete. - else { - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - } + else result.complete(); }, resultExecutor); } @Override public void destroy() { - if ( ! destroyed.getAndSet(true)) { - inflightById.values().forEach(result -> result.cancel(true)); // TODO: More informative exception. + if (destroyed.compareAndSet(false, true)) { + inflightById.values().forEach(RetriableFuture::complete); cluster.close(); resultExecutor.shutdown(); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java index 0a036c6c1b0..7be7aadc188 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java @@ -25,8 +25,7 @@ public class Result { public enum Type { success, - conditionNotMet, - failure + conditionNotMet } public Type type() { return type; } |