summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-16 19:32:45 +0200
committerjonmv <venstad@gmail.com>2024-05-16 19:32:45 +0200
commitd7954c1b84d26b7f6b429159c3d9b956dc39eaef (patch)
tree749c6e82597366cf954e335448456b28fdd646f2
parenta2c23bb2ecd78118c1282944cb3c30076b47775c (diff)
Retry until timeout, with default lots of retries
-rw-r--r--vespa-feed-client-api/abi-spec.json6
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java9
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java13
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java29
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java8
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java32
9 files changed, 53 insertions, 62 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json
index 5248e570d5e..7ac4a7619ed 100644
--- a/vespa-feed-client-api/abi-spec.json
+++ b/vespa-feed-client-api/abi-spec.json
@@ -85,8 +85,7 @@
],
"methods" : [
"public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)",
- "public int retries()",
- "public java.time.Duration gracePeriod()"
+ "public int retries()"
],
"fields" : [ ]
},
@@ -187,7 +186,8 @@
"public void <init>(java.lang.Throwable)",
"public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)",
"public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)",
- "public java.util.Optional documentId()"
+ "public java.util.Optional documentId()",
+ "public java.lang.String getMessage()"
],
"fields" : [ ]
},
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
index c45e37c79bb..7de7aae1350 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -76,10 +76,7 @@ public interface FeedClient extends Closeable {
default boolean retry(OperationType type) { return true; }
/** Maximum number of retries per operation for assumed transient, non-backpressure problems. */
- default int retries() { return 10; }
-
- /** Grace period within which an operation may be retried past its retry count (see {@link #retries}). */
- default Duration gracePeriod() { return Duration.ofSeconds(10); }
+ default int retries() { return Integer.MAX_VALUE; }
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index 372e0904990..424481b6ef2 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -12,7 +12,6 @@ import java.net.URI;
import java.nio.file.Path;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
-import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -59,11 +58,8 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
Compression compression = auto;
URI proxy;
Duration connectionTtl = Duration.ZERO;
- LongSupplier clock = Clock.systemUTC()::millis;
-
- public FeedClientBuilderImpl() {
- }
+ public FeedClientBuilderImpl() { }
FeedClientBuilderImpl(List<URI> endpoints) {
this();
@@ -255,11 +251,6 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
return this;
}
- FeedClientBuilderImpl setClock(LongSupplier clock) {
- this.clock = clock;
- return this;
- }
-
/** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
@Override
public FeedClient build() {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 9dd11113c0b..f876c4efade 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -23,7 +23,6 @@ import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -31,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.OperationParameters.empty;
@@ -45,6 +45,7 @@ import static java.util.Objects.requireNonNull;
*/
class HttpFeedClient implements FeedClient {
+ private static final Duration maxTimeout = Duration.ofMinutes(15);
private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder()
.streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
.build();
@@ -111,7 +112,8 @@ class HttpFeedClient implements FeedClient {
getPath(documentId) + getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
- params.timeout().orElse(null));
+ params.timeout().orElse(maxTimeout),
+ System::currentTimeMillis);
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
@@ -136,7 +138,8 @@ class HttpFeedClient implements FeedClient {
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
null,
- Duration.ofSeconds(15));
+ Duration.ofSeconds(15),
+ System::currentTimeMillis);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index 6de3f034f22..a75c56c9368 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -3,6 +3,7 @@ package ai.vespa.feed.client.impl;
import java.time.Duration;
import java.util.Map;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
class HttpRequest {
@@ -11,14 +12,16 @@ class HttpRequest {
private final String path;
private final Map<String, Supplier<String>> headers;
private final byte[] body;
- private final Duration timeout;
+ private final long deadlineMillis;
+ private final LongSupplier clock;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) {
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier clock) {
this.method = method;
this.path = path;
this.headers = headers;
this.body = body;
- this.timeout = timeout;
+ this.deadlineMillis = clock.getAsLong() + timeout.toMillis();
+ this.clock = clock;
}
public String method() {
@@ -37,8 +40,8 @@ class HttpRequest {
return body;
}
- public Duration timeout() {
- return timeout;
+ public Duration timeLeft() {
+ return Duration.ofMillis(deadlineMillis - clock.getAsLong());
}
@Override
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index 42e7bfa9b96..f699651634a 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -10,8 +10,6 @@ import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import java.io.IOException;
-import java.time.Clock;
-import java.time.Instant;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -23,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.LongSupplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -56,7 +53,6 @@ class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Cluster cluster;
- private final LongSupplier clock;
private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
@@ -76,7 +72,6 @@ class HttpRequestStrategy implements RequestStrategy {
this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
- this.clock = builder.clock;
Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
dispatcher.setDaemon(true);
@@ -124,8 +119,8 @@ class HttpRequestStrategy implements RequestStrategy {
return inflight.get() - delayedCount.get() > throttler.targetInflight();
}
- private boolean retry(HttpRequest request, int attempt, Instant start) {
- if (attempt > strategy.retries() && Instant.ofEpochMilli(clock.getAsLong()).isAfter(start.plus(strategy.gracePeriod())))
+ private boolean retry(HttpRequest request, int attempt) {
+ if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0)
return false;
switch (request.method().toUpperCase()) {
@@ -140,22 +135,22 @@ class HttpRequestStrategy implements RequestStrategy {
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
- private boolean retry(HttpRequest request, Throwable thrown, int attempt, Instant start) {
+ private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
breaker.failure(thrown);
if ( (thrown instanceof IOException) // General IO problems.
// Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server
|| (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed"))
) {
log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request);
- return retry(request, attempt, start);
+ return retry(request, attempt);
}
log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
return false;
}
- /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */
- private boolean retry(HttpRequest request, HttpResponse response, int attempt, Instant start) {
+ /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */
+ private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) {
logResponse(FINEST, response, request, attempt);
breaker.success();
@@ -172,7 +167,7 @@ class HttpRequestStrategy implements RequestStrategy {
logResponse(FINE, response, request, attempt);
if (response.code() == 503) { // Hopefully temporary errors.
breaker.failure(response);
- return retry(request, attempt, start);
+ return retry(request, attempt);
}
return false;
@@ -262,7 +257,7 @@ class HttpRequestStrategy implements RequestStrategy {
previous.whenComplete((__, ___) -> offer(request, vessel));
}
- handleAttempt(vessel, request, result, 1, Instant.ofEpochMilli(clock.getAsLong()));
+ handleAttempt(vessel, request, result, 1);
return result.handle((response, error) -> {
if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null)
@@ -278,15 +273,15 @@ class HttpRequestStrategy implements RequestStrategy {
/** Handles the result of one attempt at the given operation, retrying if necessary. */
private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request,
- RetriableFuture<HttpResponse> result, int attempt, Instant start) {
+ RetriableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
- if (thrown != null ? retry(request, thrown, attempt, start)
- : retry(request, response, attempt, start)) {
+ if (thrown != null ? retry(request, thrown, attempt)
+ : retry(request, response, attempt)) {
CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(request, retry);
- handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1), start);
+ handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1));
}
// ... or accept the outcome and mark the operation as complete.
else result.complete();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index df010a167f6..e856a86ffec 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -81,8 +82,11 @@ class JettyCluster implements Cluster {
Endpoint endpoint = findLeastBusyEndpoint(endpoints);
try {
endpoint.inflight.incrementAndGet();
- long reqTimeoutMillis = req.timeout() != null
- ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
+ long reqTimeoutMillis = req.timeLeft().toMillis();
+ if (reqTimeoutMillis <= 0) {
+ vessel.completeExceptionally(new TimeoutException("Operation timed out"));
+ return;
+ }
Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
.version(HttpVersion.HTTP_2)
.method(HttpMethod.fromString(req.method()))
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 57dffc5a668..14ade35825f 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -214,7 +214,7 @@ class HttpFeedClientTest {
void testHandshake() {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
- () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
+ () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
String message = exception.getMessage();
assertTrue(message.startsWith("failed handshake with server after "), message);
assertTrue(message.contains("java.net.UnknownHostException"), message);
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 74288de6dee..b06971ea0b1 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -45,7 +45,7 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
@@ -94,18 +94,16 @@ class HttpRequestStrategyTest {
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
@Override public int retries() { return 1; }
- @Override public Duration gracePeriod() { return Duration.ofMillis(100); }
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
- .setMaxStreamPerConnection(minStreams)
- .setClock(now::get),
+ .setMaxStreamPerConnection(minStreams),
cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), now::get);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -149,8 +147,8 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), now::get)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
now.set(4000);
@@ -171,7 +169,7 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -181,15 +179,15 @@ class HttpRequestStrategyTest {
assertEquals(13, strategy.stats().requests());
- // IOException is retried past retry limit within grace period.
+ // IOException is not retried past timeout.
cluster.expect((__, vessel) -> {
- now.addAndGet(10); // Exceed grace period after 10 attempts.
+ now.addAndGet(50); // Exceed grace period after 2 attempts.
vessel.completeExceptionally(new IOException("retry me"));
});
expected = assertThrows(ExecutionException.class,
- () -> strategy.enqueue(id1, request).get());
+ () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), now::get)).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
- assertEquals(24, strategy.stats().requests());
+ assertEquals(15, strategy.stats().requests());
// Circuit breaker opens some time after starting to fail.
@@ -206,7 +204,7 @@ class HttpRequestStrategyTest {
codes.put(429, 2L);
codes.put(503, 3L);
assertEquals(codes, stats.responsesByCode());
- assertEquals(14, stats.exceptions());
+ assertEquals(5, stats.exceptions());
assertEquals(stats, stats.since(initial));
assertEquals(0, stats.since(stats).averageLatencyMillis());
@@ -232,10 +230,10 @@ class HttpRequestStrategyTest {
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, null);
- HttpRequest request = new HttpRequest("POST", "/", null, null, null);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
+ HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);