diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-01 21:53:32 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-01 21:53:32 +0200 |
commit | f681e9933cc8d9c3e0470e0eede56eb9ffdee296 (patch) | |
tree | fe5a52ffc6de7ce7f6c7b51fb60a143fc8bda132 /vespa-feed-client/src | |
parent | 15d754c4e19ed171d3989904095fd9554849eb43 (diff) |
Fail based on time since last success instead
Diffstat (limited to 'vespa-feed-client/src')
3 files changed, 32 insertions, 37 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 370cd326f10..b3c711f7cc5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -45,7 +45,7 @@ class HttpFeedClient implements FeedClient { private final URI endpoint; private final Map<String, Supplier<String>> requestHeaders; - private final HttpRequestStrategy requestStrategy; + private final RequestStrategy requestStrategy; private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>(); private final List<AtomicInteger> inflight = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -56,14 +56,14 @@ class HttpFeedClient implements FeedClient { this.requestStrategy = new HttpRequestStrategy(builder); for (int i = 0; i < builder.maxConnections; i++) { - CloseableHttpAsyncClient client = createHttpClient(builder, requestStrategy); + CloseableHttpAsyncClient client = createHttpClient(builder); client.start(); httpClients.add(client); inflight.add(new AtomicInteger()); } } - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder, HttpRequestStrategy retryStrategy) throws IOException { + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create() .setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION)) .setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION))) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 23569af3cdc..17edfeafba6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -6,15 +6,14 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.logging.Logger; import static java.lang.Math.max; -import static java.lang.Math.min; import static java.util.logging.Level.INFO; /** @@ -30,8 +29,7 @@ import static java.util.logging.Level.INFO; */ class HttpRequestStrategy implements RequestStrategy { - private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); - private static final double errorThreshold = 0.1; + private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>(); private final Object lock = new Object(); @@ -40,9 +38,9 @@ class HttpRequestStrategy implements RequestStrategy { private final long minInflight; private double targetInflight; private long inflight = 0; - private double errorRate = 0; private long consecutiveSuccesses = 0; private boolean failed = false; + private Instant lastSuccess = Instant.MAX; HttpRequestStrategy(FeedClientBuilder builder) { this.wrapped = builder.retryStrategy; @@ -51,53 +49,47 @@ class HttpRequestStrategy implements RequestStrategy { this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); } + private boolean retry(SimpleHttpRequest request, int attempt) { + if (attempt >= wrapped.retries()) + return false; + + switch (request.getMethod().toUpperCase()) { + + case "POST": return wrapped.retry(FeedClient.OperationType.put); + case "PUT": return wrapped.retry(FeedClient.OperationType.update); + case "DELETE": return wrapped.retry(FeedClient.OperationType.remove); + default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); + } + } + /** * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { failure(); - log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + - ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error"); + log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error"); if ( ! (thrown instanceof IOException)) return false; - if (attempt > wrapped.retries()) - return false; - - - switch (request.getMethod().toUpperCase()) { - case "POST": return wrapped.retry(FeedClient.OperationType.put); - case "PUT": return wrapped.retry(FeedClient.OperationType.update); - case "DELETE": return wrapped.retry(FeedClient.OperationType.remove); - default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); - } + return retry(request, attempt); } - /** - * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the - * error rate converge towards the true error rate, at a speed inversely proportional to the target number - * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width. - */ void success() { + Instant now = Instant.now(); synchronized (lock) { - errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates. - targetInflight = min(targetInflight + 0.1, maxInflight); ++consecutiveSuccesses; + lastSuccess = now; } } - /** - * Called whenever a failure to get a successful response is recorded. - */ void failure() { + Instant threshold = Instant.now().minusSeconds(300); synchronized (lock) { - errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates. - if (errorRate > errorThreshold) - failed = true; - consecutiveSuccesses = 0; + if (lastSuccess.isBefore(threshold)) + failed = true; } } @@ -119,7 +111,10 @@ class HttpRequestStrategy implements RequestStrategy { } failure(); - return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors. + if (response.getCode() != 500 && response.getCode() != 502) + return false; + + return retry(request, attempt); // Hopefully temporary errors. } // Must hold lock. diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index 466c4f9a0ab..1787d8d65c6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -14,10 +14,10 @@ import java.util.function.BiConsumer; */ public interface RequestStrategy { - /** Whether this has failed, and we should stop. */ + /** Whether this has failed fatally, and we should cease sending further operations. */ boolean hasFailed(); - /** Enqueue the given operation, which is dispatched to a vessel future when ready. */ + /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch); |