summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-16 15:31:21 +0200
committerjonmv <venstad@gmail.com>2024-05-16 15:31:21 +0200
commit0c94e0b116f67fc07c9281552b58d9d4c11fd88a (patch)
tree0091288192c2ccc6201d040890ac6e915d222b11
parent975d689861eb5350286f294d79ecf4942f14473a (diff)
Retry requests within retry count limit OR grace period (default 10s)
-rw-r--r--vespa-feed-client-api/abi-spec.json3
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java8
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java32
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java53
5 files changed, 72 insertions, 30 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json
index 9065edad92a..5248e570d5e 100644
--- a/vespa-feed-client-api/abi-spec.json
+++ b/vespa-feed-client-api/abi-spec.json
@@ -85,7 +85,8 @@
],
"methods" : [
"public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)",
- "public int retries()"
+ "public int retries()",
+ "public java.time.Duration gracePeriod()"
],
"fields" : [ ]
},
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
index d73d36e0f4e..c45e37c79bb 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -2,6 +2,7 @@
package ai.vespa.feed.client;
import java.io.Closeable;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -74,9 +75,12 @@ public interface FeedClient extends Closeable {
/** Whether to retry operations of the given type. */
default boolean retry(OperationType type) { return true; }
- /** Number of retries per operation for assumed transient, non-backpressure problems. */
+ /** Maximum number of retries per operation for assumed transient, non-backpressure problems. */
default int retries() { return 10; }
+ /** Grace period within which an operation may be retried past its retry count (see {@link #retries}). */
+ default Duration gracePeriod() { return Duration.ofSeconds(10); }
+
}
/** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index c271ac356e9..372e0904990 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -12,12 +12,14 @@ import java.net.URI;
import java.nio.file.Path;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
+import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto;
@@ -57,6 +59,7 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
Compression compression = auto;
URI proxy;
Duration connectionTtl = Duration.ZERO;
+ LongSupplier clock = Clock.systemUTC()::millis;
public FeedClientBuilderImpl() {
@@ -252,6 +255,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
return this;
}
+ FeedClientBuilderImpl setClock(LongSupplier clock) {
+ this.clock = clock;
+ return this;
+ }
+
/** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
@Override
public FeedClient build() {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index 8f0327a1738..42e7bfa9b96 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -10,6 +10,8 @@ import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongSupplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,6 +34,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
+import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
/**
@@ -52,6 +56,7 @@ class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Cluster cluster;
+ private final LongSupplier clock;
private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
@@ -71,6 +76,7 @@ class HttpRequestStrategy implements RequestStrategy {
this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
+ this.clock = builder.clock;
Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
dispatcher.setDaemon(true);
@@ -96,7 +102,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
catch (Throwable t) {
- log.log(WARNING, "Dispatch thread threw; shutting down", t);
+ log.log(SEVERE, "Dispatch thread threw; shutting down", t);
}
destroy();
}
@@ -118,8 +124,8 @@ class HttpRequestStrategy implements RequestStrategy {
return inflight.get() - delayedCount.get() > throttler.targetInflight();
}
- private boolean retry(HttpRequest request, int attempt) {
- if (attempt > strategy.retries())
+ private boolean retry(HttpRequest request, int attempt, Instant start) {
+ if (attempt > strategy.retries() && Instant.ofEpochMilli(clock.getAsLong()).isAfter(start.plus(strategy.gracePeriod())))
return false;
switch (request.method().toUpperCase()) {
@@ -134,15 +140,14 @@ class HttpRequestStrategy implements RequestStrategy {
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
- private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
+ private boolean retry(HttpRequest request, Throwable thrown, int attempt, Instant start) {
breaker.failure(thrown);
if ( (thrown instanceof IOException) // General IO problems.
-
// Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server
|| (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed"))
) {
log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request);
- return retry(request, attempt);
+ return retry(request, attempt, start);
}
log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
@@ -150,7 +155,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */
- private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
+ private boolean retry(HttpRequest request, HttpResponse response, int attempt, Instant start) {
if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) {
logResponse(FINEST, response, request, attempt);
breaker.success();
@@ -167,7 +172,7 @@ class HttpRequestStrategy implements RequestStrategy {
logResponse(FINE, response, request, attempt);
if (response.code() == 503) { // Hopefully temporary errors.
breaker.failure(response);
- return retry(request, attempt);
+ return retry(request, attempt, start);
}
return false;
@@ -257,7 +262,7 @@ class HttpRequestStrategy implements RequestStrategy {
previous.whenComplete((__, ___) -> offer(request, vessel));
}
- handleAttempt(vessel, request, result, 1);
+ handleAttempt(vessel, request, result, 1, Instant.ofEpochMilli(clock.getAsLong()));
return result.handle((response, error) -> {
if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null)
@@ -272,15 +277,16 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request,
+ RetriableFuture<HttpResponse> result, int attempt, Instant start) {
vessel.whenCompleteAsync((response, thrown) -> {
result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
- if (thrown != null ? retry(request, thrown, attempt)
- : retry(request, response, attempt)) {
+ if (thrown != null ? retry(request, thrown, attempt, start)
+ : retry(request, response, attempt, start)) {
CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(request, retry);
- handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1));
+ handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1), start);
}
// ... or accept the outcome and mark the operation as complete.
else result.complete();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b1a04ac9ed4..b8335f97e9c 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -11,7 +11,10 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
+import java.time.Clock;
import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,9 +50,9 @@ class HttpRequestStrategyTest {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
- HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setConnectionsPerEndpoint(1 << 10)
- .setMaxStreamPerConnection(1 << 12),
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
@@ -81,20 +84,22 @@ class HttpRequestStrategyTest {
assertEquals(2 * documents, stats.bytesReceived());
}
- @Test
+ @Test()
void testRetries() throws ExecutionException, InterruptedException {
- int minStreams = 16; // Hard limit for minimum number of streams per connection.
+ int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setRetryStrategy(new FeedClient.RetryStrategy() {
- @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
- @Override public int retries() { return 1; }
- })
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1)
- .setMaxStreamPerConnection(minStreams),
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ @Override public Duration gracePeriod() { return Duration.ofMillis(100); }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams)
+ .setClock(now::get),
cluster);
OperationStats initial = strategy.stats();
@@ -111,7 +116,10 @@ class HttpRequestStrategyTest {
assertEquals(1, strategy.stats().requests());
// IOException is retried.
- cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
+ cluster.expect((__, vessel) -> {
+ now.addAndGet(200); // Exceed grace period.
+ vessel.completeExceptionally(new IOException("retry me"));
+ });
expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
@@ -153,7 +161,10 @@ class HttpRequestStrategyTest {
// Some error responses are retried.
HttpResponse serverError = HttpResponse.of(503, null);
- cluster.expect((__, vessel) -> vessel.complete(serverError));
+ cluster.expect((__, vessel) -> {
+ now.addAndGet(200); // Exceed grace period.
+ vessel.complete(serverError);
+ });
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
@@ -169,6 +180,18 @@ class HttpRequestStrategyTest {
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
+
+ // IOException is retried past retry limit within grace period.
+ cluster.expect((__, vessel) -> {
+ now.addAndGet(10); // Exceed grace period.
+ vessel.completeExceptionally(new IOException("retry me"));
+ });
+ expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("retry me", expected.getCause().getCause().getMessage());
+ assertEquals(24, strategy.stats().requests());
+
+
// Circuit breaker opens some time after starting to fail.
now.set(6000);
assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
@@ -183,7 +206,7 @@ class HttpRequestStrategyTest {
codes.put(429, 2L);
codes.put(503, 3L);
assertEquals(codes, stats.responsesByCode());
- assertEquals(3, stats.exceptions());
+ assertEquals(14, stats.exceptions());
assertEquals(stats, stats.since(initial));
assertEquals(0, stats.since(stats).averageLatencyMillis());