diff options
author | jonmv <venstad@gmail.com> | 2024-05-16 15:31:21 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2024-05-16 15:31:21 +0200 |
commit | 0c94e0b116f67fc07c9281552b58d9d4c11fd88a (patch) | |
tree | 0091288192c2ccc6201d040890ac6e915d222b11 | |
parent | 975d689861eb5350286f294d79ecf4942f14473a (diff) |
Retry requests within retry count limit OR grace period (default 10s)
5 files changed, 72 insertions, 30 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json index 9065edad92a..5248e570d5e 100644 --- a/vespa-feed-client-api/abi-spec.json +++ b/vespa-feed-client-api/abi-spec.json @@ -85,7 +85,8 @@ ], "methods" : [ "public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)", - "public int retries()" + "public int retries()", + "public java.time.Duration gracePeriod()" ], "fields" : [ ] }, diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java index d73d36e0f4e..c45e37c79bb 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -2,6 +2,7 @@ package ai.vespa.feed.client; import java.io.Closeable; +import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -74,9 +75,12 @@ public interface FeedClient extends Closeable { /** Whether to retry operations of the given type. */ default boolean retry(OperationType type) { return true; } - /** Number of retries per operation for assumed transient, non-backpressure problems. */ + /** Maximum number of retries per operation for assumed transient, non-backpressure problems. */ default int retries() { return 10; } + /** Grace period within which an operation may be retried past its retry count (see {@link #retries}). */ + default Duration gracePeriod() { return Duration.ofSeconds(10); } + } /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index c271ac356e9..372e0904990 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -12,12 +12,14 @@ import java.net.URI; import java.nio.file.Path; import java.security.PrivateKey; import java.security.cert.X509Certificate; +import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; @@ -57,6 +59,7 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; + LongSupplier clock = Clock.systemUTC()::millis; public FeedClientBuilderImpl() { @@ -252,6 +255,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } + FeedClientBuilderImpl setClock(LongSupplier clock) { + this.clock = clock; + return this; + } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 8f0327a1738..42e7bfa9b96 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -10,6 +10,8 @@ import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; import java.io.IOException; +import java.time.Clock; +import java.time.Instant; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongSupplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,6 +34,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; /** @@ -52,6 +56,7 @@ class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Cluster cluster; + private final LongSupplier clock; private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; @@ -71,6 +76,7 @@ class HttpRequestStrategy implements RequestStrategy { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; + this.clock = builder.clock; Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); dispatcher.setDaemon(true); @@ -96,7 +102,7 @@ class HttpRequestStrategy implements RequestStrategy { } } catch (Throwable t) { - log.log(WARNING, "Dispatch thread threw; shutting down", t); + log.log(SEVERE, "Dispatch thread threw; shutting down", t); } destroy(); } @@ -118,8 +124,8 @@ class HttpRequestStrategy implements RequestStrategy { return inflight.get() - delayedCount.get() > throttler.targetInflight(); } - private boolean retry(HttpRequest request, int attempt) { - if (attempt > strategy.retries()) + private boolean retry(HttpRequest request, int attempt, Instant start) { + if (attempt > strategy.retries() && Instant.ofEpochMilli(clock.getAsLong()).isAfter(start.plus(strategy.gracePeriod()))) return false; switch (request.method().toUpperCase()) { @@ -134,15 +140,14 @@ class HttpRequestStrategy implements RequestStrategy { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(HttpRequest request, Throwable thrown, int attempt) { + private boolean retry(HttpRequest request, Throwable thrown, int attempt, Instant start) { breaker.failure(thrown); if ( (thrown instanceof IOException) // General IO problems. - // Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed")) ) { log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); - return retry(request, attempt); + return retry(request, attempt, start); } log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); @@ -150,7 +155,7 @@ class HttpRequestStrategy implements RequestStrategy { } /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */ - private boolean retry(HttpRequest request, HttpResponse response, int attempt) { + private boolean retry(HttpRequest request, HttpResponse response, int attempt, Instant start) { if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) { logResponse(FINEST, response, request, attempt); breaker.success(); @@ -167,7 +172,7 @@ class HttpRequestStrategy implements RequestStrategy { logResponse(FINE, response, request, attempt); if (response.code() == 503) { // Hopefully temporary errors. breaker.failure(response); - return retry(request, attempt); + return retry(request, attempt, start); } return false; @@ -257,7 +262,7 @@ class HttpRequestStrategy implements RequestStrategy { previous.whenComplete((__, ___) -> offer(request, vessel)); } - handleAttempt(vessel, request, result, 1); + handleAttempt(vessel, request, result, 1, Instant.ofEpochMilli(clock.getAsLong())); return result.handle((response, error) -> { if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) @@ -272,15 +277,16 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, + RetriableFuture<HttpResponse> result, int attempt, Instant start) { vessel.whenCompleteAsync((response, thrown) -> { result.set(response, thrown); // Retry the operation if it failed with a transient error ... - if (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt)) { + if (thrown != null ? retry(request, thrown, attempt, start) + : retry(request, response, attempt, start)) { CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(request, retry); - handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1)); + handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1), start); } // ... or accept the outcome and mark the operation as complete. else result.complete(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b1a04ac9ed4..b8335f97e9c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -11,7 +11,10 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,9 +50,9 @@ class HttpRequestStrategyTest { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); - HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setConnectionsPerEndpoint(1 << 10) - .setMaxStreamPerConnection(1 << 12), + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { @@ -81,20 +84,22 @@ class HttpRequestStrategyTest { assertEquals(2 * documents, stats.bytesReceived()); } - @Test + @Test() void testRetries() throws ExecutionException, InterruptedException { - int minStreams = 16; // Hard limit for minimum number of streams per connection. + int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setRetryStrategy(new FeedClient.RetryStrategy() { - @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } - @Override public int retries() { return 1; } - }) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1) - .setMaxStreamPerConnection(minStreams), + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + @Override public Duration gracePeriod() { return Duration.ofMillis(100); } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams) + .setClock(now::get), cluster); OperationStats initial = strategy.stats(); @@ -111,7 +116,10 @@ class HttpRequestStrategyTest { assertEquals(1, strategy.stats().requests()); // IOException is retried. - cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); + cluster.expect((__, vessel) -> { + now.addAndGet(200); // Exceed grace period. + vessel.completeExceptionally(new IOException("retry me")); + }); expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); @@ -153,7 +161,10 @@ class HttpRequestStrategyTest { // Some error responses are retried. HttpResponse serverError = HttpResponse.of(503, null); - cluster.expect((__, vessel) -> vessel.complete(serverError)); + cluster.expect((__, vessel) -> { + now.addAndGet(200); // Exceed grace period. + vessel.complete(serverError); + }); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. @@ -169,6 +180,18 @@ class HttpRequestStrategyTest { assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); + + // IOException is retried past retry limit within grace period. + cluster.expect((__, vessel) -> { + now.addAndGet(10); // Exceed grace period. + vessel.completeExceptionally(new IOException("retry me")); + }); + expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, request).get()); + assertEquals("retry me", expected.getCause().getCause().getMessage()); + assertEquals(24, strategy.stats().requests()); + + // Circuit breaker opens some time after starting to fail. now.set(6000); assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. @@ -183,7 +206,7 @@ class HttpRequestStrategyTest { codes.put(429, 2L); codes.put(503, 3L); assertEquals(codes, stats.responsesByCode()); - assertEquals(3, stats.exceptions()); + assertEquals(14, stats.exceptions()); assertEquals(stats, stats.since(initial)); assertEquals(0, stats.since(stats).averageLatencyMillis()); |