diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-01 12:37:49 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-01 12:37:49 +0200 |
commit | 15d754c4e19ed171d3989904095fd9554849eb43 (patch) | |
tree | 827c5e97fff4a938ade4c21fd5c208421ea2886d /vespa-feed-client/src | |
parent | 9d19abe4f78e8139517588aba4ee827d5e0e8227 (diff) |
Higher level retries
Diffstat (limited to 'vespa-feed-client/src')
3 files changed, 174 insertions, 114 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 2f379bd0778..370cd326f10 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -56,9 +56,9 @@ class HttpFeedClient implements FeedClient { this.requestStrategy = new HttpRequestStrategy(builder); for (int i = 0; i < builder.maxConnections; i++) { - CloseableHttpAsyncClient hc = createHttpClient(builder, requestStrategy); - hc.start(); - httpClients.add(hc); + CloseableHttpAsyncClient client = createHttpClient(builder, requestStrategy); + client.start(); + httpClients.add(client); inflight.add(new AtomicInteger()); } } @@ -69,7 +69,7 @@ class HttpFeedClient implements FeedClient { .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) .disableCookieManagement() .disableRedirectHandling() - .setRetryStrategy(retryStrategy) + .disableAutomaticRetries() .setIOReactorConfig(IOReactorConfig.custom() .setSoTimeout(Timeout.ofSeconds(10)) .build()) @@ -140,6 +140,23 @@ class HttpFeedClient implements FeedClient { if (operationJson != null) request.setBody(operationJson, ContentType.APPLICATION_JSON); + return requestStrategy.enqueue(documentId, request, this::send) + .handle((response, thrown) -> { + if (thrown != null) { + if (requestStrategy.hasFailed()) { + try { close(); } + catch (IOException exception) { thrown.addSuppressed(exception); } + } + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + thrown.printStackTrace(new PrintStream(buffer)); + return new Result(Result.Type.failure, documentId, buffer.toString(), null); + } + return toResult(response, documentId); + }); + } + + /** Sends the given request to the client with the least current inflight requests, completing the given vessel when done. */ + private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { int index = 0; int min = Integer.MAX_VALUE; for (int i = 0; i < httpClients.size(); i++) @@ -148,29 +165,19 @@ class HttpFeedClient implements FeedClient { index = i; } - CloseableHttpAsyncClient client = httpClients.get(index); - AtomicInteger counter = inflight.get(index); - counter.incrementAndGet(); - return requestStrategy.enqueue(documentId, future -> { - client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { future.complete(response); } - @Override public void failed(Exception ex) { future.completeExceptionally(ex); } - @Override public void cancelled() { future.cancel(false); } - }); - }).handle((response, thrown) -> { - counter.decrementAndGet(); - if (thrown != null) { - if (requestStrategy.hasFailed()) { - try { close(); } - catch (IOException exception) { thrown.addSuppressed(exception); } - } - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - thrown.printStackTrace(new PrintStream(buffer)); - return new Result(Result.Type.failure, documentId, buffer.toString(), null); - } - return toResult(response, documentId); - }); + inflight.get(index).incrementAndGet(); + try { + httpClients.get(index).execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.thenRun(inflight.get(index)::decrementAndGet); } static Result toResult(SimpleHttpResponse response, DocumentId documentId) { @@ -180,7 +187,7 @@ class HttpFeedClient implements FeedClient { case 412: type = Result.Type.conditionNotMet; break; default: type = Result.Type.failure; } - Map<String, String> responseJson = null; // TODO: parse JSON. + Map<String, String> responseJson = null; // TODO: parse JSON on error. return new Result(type, documentId, response.getBodyText(), "trace"); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 0512d6a64c9..23569af3cdc 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -1,69 +1,72 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import ai.vespa.feed.client.FeedClient.RetryStrategy; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; -import org.apache.hc.core5.http.HttpRequest; -import org.apache.hc.core5.http.HttpResponse; -import org.apache.hc.core5.http.protocol.HttpContext; -import org.apache.hc.core5.util.TimeValue; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.logging.Logger; + +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.util.logging.Level.INFO; /** * Controls request execution and retries: * <ul> * <li>Retry all IO exceptions; however</li> * <li>abort everything if more than 10% of requests result in an exception for some time.</li> - * <li>Whenever throttled, limit inflight to 99% of current; and</li> + * <li>Whenever throttled, limit inflight to one less than the current; and</li> * <li>on every successful response, increase inflight limit by 0.1.</li> * </ul> * * @author jonmv */ -class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRequestRetryStrategy { +class HttpRequestStrategy implements RequestStrategy { + + private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); + private static final double errorThreshold = 0.1; - private final Map<DocumentId, CompletableFuture<SimpleHttpResponse>> byId = new ConcurrentHashMap<>(); - private final FeedClient.RetryStrategy wrapped; + private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>(); + private final Object lock = new Object(); + private final RetryStrategy wrapped; private final long maxInflight; + private final long minInflight; private double targetInflight; - private long inflight; - private final AtomicReference<Double> errorRate; - private final double errorThreshold; - private final Lock lock; - private final Condition available; + private long inflight = 0; + private double errorRate = 0; + private long consecutiveSuccesses = 0; + private boolean failed = false; HttpRequestStrategy(FeedClientBuilder builder) { this.wrapped = builder.retryStrategy; this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection; - this.targetInflight = maxInflight; - this.inflight = 0; - this.errorRate = new AtomicReference<>(0.0); - this.errorThreshold = 0.1; - this.lock = new ReentrantLock(true); - this.available = lock.newCondition(); + this.minInflight = builder.maxConnections * (long) Math.min(16, builder.maxStreamsPerConnection); + this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); } - private double cycle() { - return targetInflight; // TODO: tune this--could start way too high if limit is set too high. - } + /** + * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, + * or the user has turned off retries for this type of operation. + */ + private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { + failure(); + log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + + ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error"); - @Override - public boolean retryRequest(HttpRequest request, IOException exception, int execCount, HttpContext context) { - if (errorRate.updateAndGet(rate -> rate + (1 - rate) / cycle()) > errorThreshold) + if ( ! (thrown instanceof IOException)) return false; - if (execCount > wrapped.retries()) + if (attempt > wrapped.retries()) return false; + switch (request.getMethod().toUpperCase()) { case "POST": return wrapped.retry(FeedClient.OperationType.put); case "PUT": return wrapped.retry(FeedClient.OperationType.update); @@ -73,83 +76,129 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe } /** - * Called when a response is successfully obtained. + * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the + * error rate converge towards the true error rate, at a speed inversely proportional to the target number + * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width. */ void success() { - errorRate.updateAndGet(rate -> rate - rate / cycle()); - lock.lock(); - targetInflight = Math.min(targetInflight + 0.1, maxInflight); - lock.unlock(); + synchronized (lock) { + errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates. + targetInflight = min(targetInflight + 0.1, maxInflight); + ++consecutiveSuccesses; + } } - @Override - public boolean retryRequest(HttpResponse response, int execCount, HttpContext context) { - if (response.getCode() == 429 || response.getCode() == 503) { - lock.lock(); - targetInflight = Math.max(100, 99 * inflight / 100); - lock.unlock(); - return true; + /** + * Called whenever a failure to get a successful response is recorded. + */ + void failure() { + synchronized (lock) { + errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates. + if (errorRate > errorThreshold) + failed = true; + + consecutiveSuccesses = 0; } - return false; } - @Override - public TimeValue getRetryInterval(HttpResponse response, int execCount, HttpContext context) { - return TimeValue.ofMilliseconds(100); + /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ + private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { + if (response.getCode() / 100 == 2) { + success(); + return false; + } + + log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt + + " at " + request + ", " + consecutiveSuccesses + " successes since last error"); + + if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. + synchronized (lock) { + targetInflight = max(inflight * 0.9, minInflight); + } + return true; + } + + failure(); + return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors. } - void acquireSlot() { - lock.lock(); + // Must hold lock. + private void acquireSlot() { try { while (inflight >= targetInflight) - available.awaitUninterruptibly(); + lock.wait(); ++inflight; } - finally { - lock.unlock(); + catch (InterruptedException e) { + throw new RuntimeException(e); } } - void releaseSlot() { - lock.lock(); - try { - --inflight; - - if (inflight < targetInflight) - available.signal(); - } - finally { - lock.unlock(); - } + // Must hold lock. + private void releaseSlot() { + for (long i = --inflight; i < targetInflight; i++) + lock.notify(); } @Override public boolean hasFailed() { - return errorRate.get() > errorThreshold; + synchronized (lock) { + return failed; + } } @Override - public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) { - acquireSlot(); - - Consumer<CompletableFuture<SimpleHttpResponse>> safeDispatch = vessel -> { - try { dispatch.accept(vessel); } - catch (Throwable t) { vessel.completeExceptionally(t); } - }; - CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); - byId.compute(documentId, (id, previous) -> { - if (previous == null) safeDispatch.accept(vessel); - else previous.whenComplete((__, ___) -> safeDispatch.accept(vessel)); - return vessel; - }); + public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request, + BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) { + CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. + CompletableFuture<Void> blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete. + + // Get the previous inflight operation for this doc-id, or acquire a send slot. + CompletableFuture<Void> previous; + synchronized (lock) { + previous = inflightById.put(documentId, blocker); + if (previous == null) + acquireSlot(); + } + if (previous == null) // Send immediately if none inflight ... + dispatch.accept(request, vessel); + else // ... or send when the previous inflight is done. + previous.thenRun(() -> dispatch.accept(request, vessel)); - return vessel.whenComplete((__, thrown) -> { - releaseSlot(); - if (thrown == null) - success(); + handleAttempt(vessel, dispatch, blocker, request, result, documentId, 1); + return result; + } - byId.compute(documentId, (id, current) -> current == vessel ? null : current); + /** Handles the result of one attempt at the given operation, retrying if necessary. */ + private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch, + CompletableFuture<Void> blocker, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, + DocumentId documentId, int attempt) { + vessel.whenComplete((response, thrown) -> { + // Retry the operation if it failed with a transient error ... + if ( ! failed && (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt))) { + CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); + dispatch.accept(request, retry); + handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1); + return; + } + + // ... or accept the outcome and mark the operation as complete. + CompletableFuture<Void> current; + synchronized (lock) { + current = inflightById.get(documentId); + if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ... + releaseSlot(); + inflightById.put(documentId, null); + } + } + if (current != blocker) // ... or trigger sending the next enqueued operation. + blocker.complete(null); + + if (thrown == null) result.complete(response); + else result.completeExceptionally(thrown); }); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index e5eb956114e..466c4f9a0ab 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,20 +1,24 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; + import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; +import java.util.function.BiConsumer; /** * Controls execution of feed operations. * * @author jonmv */ -public interface RequestStrategy<T> { +public interface RequestStrategy { /** Whether this has failed, and we should stop. */ boolean hasFailed(); /** Enqueue the given operation, which is dispatched to a vessel future when ready. */ - CompletableFuture<T> enqueue(DocumentId documentId, Consumer<CompletableFuture<T>> dispatch); + CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request, + BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch); } |