diff options
author | jonmv <venstad@gmail.com> | 2024-05-16 19:32:45 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2024-05-16 19:32:45 +0200 |
commit | d7954c1b84d26b7f6b429159c3d9b956dc39eaef (patch) | |
tree | 749c6e82597366cf954e335448456b28fdd646f2 | |
parent | a2c23bb2ecd78118c1282944cb3c30076b47775c (diff) |
Retry until timeout, with default lots of retries
9 files changed, 53 insertions, 62 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json index 5248e570d5e..7ac4a7619ed 100644 --- a/vespa-feed-client-api/abi-spec.json +++ b/vespa-feed-client-api/abi-spec.json @@ -85,8 +85,7 @@ ], "methods" : [ "public boolean retry(ai.vespa.feed.client.FeedClient$OperationType)", - "public int retries()", - "public java.time.Duration gracePeriod()" + "public int retries()" ], "fields" : [ ] }, @@ -187,7 +186,8 @@ "public void <init>(java.lang.Throwable)", "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)", "public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)", - "public java.util.Optional documentId()" + "public java.util.Optional documentId()", + "public java.lang.String getMessage()" ], "fields" : [ ] }, diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java index c45e37c79bb..7de7aae1350 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -76,10 +76,7 @@ public interface FeedClient extends Closeable { default boolean retry(OperationType type) { return true; } /** Maximum number of retries per operation for assumed transient, non-backpressure problems. */ - default int retries() { return 10; } - - /** Grace period within which an operation may be retried past its retry count (see {@link #retries}). */ - default Duration gracePeriod() { return Duration.ofSeconds(10); } + default int retries() { return Integer.MAX_VALUE; } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 372e0904990..424481b6ef2 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -12,7 +12,6 @@ import java.net.URI; import java.nio.file.Path; import java.security.PrivateKey; import java.security.cert.X509Certificate; -import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -59,11 +58,8 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; - LongSupplier clock = Clock.systemUTC()::millis; - - public FeedClientBuilderImpl() { - } + public FeedClientBuilderImpl() { } FeedClientBuilderImpl(List<URI> endpoints) { this(); @@ -255,11 +251,6 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } - FeedClientBuilderImpl setClock(LongSupplier clock) { - this.clock = clock; - return this; - } - /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 9dd11113c0b..f876c4efade 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -23,7 +23,6 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -31,6 +30,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.OperationParameters.empty; @@ -45,6 +45,7 @@ import static java.util.Objects.requireNonNull; */ class HttpFeedClient implements FeedClient { + private static final Duration maxTimeout = Duration.ofMinutes(15); private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder() .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()) .build(); @@ -111,7 +112,8 @@ class HttpFeedClient implements FeedClient { getPath(documentId) + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? - params.timeout().orElse(null)); + params.timeout().orElse(maxTimeout), + System::currentTimeMillis); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -136,7 +138,8 @@ class HttpFeedClient implements FeedClient { getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, null, - Duration.ofSeconds(15)); + Duration.ofSeconds(15), + System::currentTimeMillis); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 6de3f034f22..a75c56c9368 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -3,6 +3,7 @@ package ai.vespa.feed.client.impl; import java.time.Duration; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; class HttpRequest { @@ -11,14 +12,16 @@ class HttpRequest { private final String path; private final Map<String, Supplier<String>> headers; private final byte[] body; - private final Duration timeout; + private final long deadlineMillis; + private final LongSupplier clock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) { + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier clock) { this.method = method; this.path = path; this.headers = headers; this.body = body; - this.timeout = timeout; + this.deadlineMillis = clock.getAsLong() + timeout.toMillis(); + this.clock = clock; } public String method() { @@ -37,8 +40,8 @@ class HttpRequest { return body; } - public Duration timeout() { - return timeout; + public Duration timeLeft() { + return Duration.ofMillis(deadlineMillis - clock.getAsLong()); } @Override diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 42e7bfa9b96..f699651634a 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -10,8 +10,6 @@ import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; import java.io.IOException; -import java.time.Clock; -import java.time.Instant; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -23,7 +21,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongSupplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -56,7 +53,6 @@ class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Cluster cluster; - private final LongSupplier clock; private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; @@ -76,7 +72,6 @@ class HttpRequestStrategy implements RequestStrategy { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; - this.clock = builder.clock; Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); dispatcher.setDaemon(true); @@ -124,8 +119,8 @@ class HttpRequestStrategy implements RequestStrategy { return inflight.get() - delayedCount.get() > throttler.targetInflight(); } - private boolean retry(HttpRequest request, int attempt, Instant start) { - if (attempt > strategy.retries() && Instant.ofEpochMilli(clock.getAsLong()).isAfter(start.plus(strategy.gracePeriod()))) + private boolean retry(HttpRequest request, int attempt) { + if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0) return false; switch (request.method().toUpperCase()) { @@ -140,22 +135,22 @@ class HttpRequestStrategy implements RequestStrategy { * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold, * or the user has turned off retries for this type of operation. */ - private boolean retry(HttpRequest request, Throwable thrown, int attempt, Instant start) { + private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(thrown); if ( (thrown instanceof IOException) // General IO problems. // Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed")) ) { log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); - return retry(request, attempt, start); + return retry(request, attempt); } log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); return false; } - /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */ - private boolean retry(HttpRequest request, HttpResponse response, int attempt, Instant start) { + /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */ + private boolean retry(HttpRequest request, HttpResponse response, int attempt) { if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) { logResponse(FINEST, response, request, attempt); breaker.success(); @@ -172,7 +167,7 @@ class HttpRequestStrategy implements RequestStrategy { logResponse(FINE, response, request, attempt); if (response.code() == 503) { // Hopefully temporary errors. breaker.failure(response); - return retry(request, attempt, start); + return retry(request, attempt); } return false; @@ -262,7 +257,7 @@ class HttpRequestStrategy implements RequestStrategy { previous.whenComplete((__, ___) -> offer(request, vessel)); } - handleAttempt(vessel, request, result, 1, Instant.ofEpochMilli(clock.getAsLong())); + handleAttempt(vessel, request, result, 1); return result.handle((response, error) -> { if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) @@ -278,15 +273,15 @@ class HttpRequestStrategy implements RequestStrategy { /** Handles the result of one attempt at the given operation, retrying if necessary. */ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, - RetriableFuture<HttpResponse> result, int attempt, Instant start) { + RetriableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { result.set(response, thrown); // Retry the operation if it failed with a transient error ... - if (thrown != null ? retry(request, thrown, attempt, start) - : retry(request, response, attempt, start)) { + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(request, retry); - handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1), start); + handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1)); } // ... or accept the outcome and mark the operation as complete. else result.complete(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index df010a167f6..e856a86ffec 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -81,8 +82,11 @@ class JettyCluster implements Cluster { Endpoint endpoint = findLeastBusyEndpoint(endpoints); try { endpoint.inflight.incrementAndGet(); - long reqTimeoutMillis = req.timeout() != null - ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); + long reqTimeoutMillis = req.timeLeft().toMillis(); + if (reqTimeoutMillis <= 0) { + vessel.completeExceptionally(new TimeoutException("Operation timed out")); + return; + } Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 57dffc5a668..14ade35825f 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -214,7 +214,7 @@ class HttpFeedClientTest { void testHandshake() { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); String message = exception.getMessage(); assertTrue(message.startsWith("failed handshake with server after "), message); assertTrue(message.contains("java.net.UnknownHostException"), message); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 74288de6dee..b06971ea0b1 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -45,7 +45,7 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, null); + HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); @@ -94,18 +94,16 @@ class HttpRequestStrategyTest { .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } @Override public int retries() { return 1; } - @Override public Duration gracePeriod() { return Duration.ofMillis(100); } }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) - .setMaxStreamPerConnection(minStreams) - .setClock(now::get), + .setMaxStreamPerConnection(minStreams), cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), now::get); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -149,8 +147,8 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), now::get)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. now.set(4000); @@ -171,7 +169,7 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -181,15 +179,15 @@ class HttpRequestStrategyTest { assertEquals(13, strategy.stats().requests()); - // IOException is retried past retry limit within grace period. + // IOException is not retried past timeout. cluster.expect((__, vessel) -> { - now.addAndGet(10); // Exceed grace period after 10 attempts. + now.addAndGet(50); // Exceed grace period after 2 attempts. vessel.completeExceptionally(new IOException("retry me")); }); expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, request).get()); + () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), now::get)).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); - assertEquals(24, strategy.stats().requests()); + assertEquals(15, strategy.stats().requests()); // Circuit breaker opens some time after starting to fail. @@ -206,7 +204,7 @@ class HttpRequestStrategyTest { codes.put(429, 2L); codes.put(503, 3L); assertEquals(codes, stats.responsesByCode()); - assertEquals(14, stats.exceptions()); + assertEquals(5, stats.exceptions()); assertEquals(stats, stats.since(initial)); assertEquals(0, stats.since(stats).averageLatencyMillis()); @@ -232,10 +230,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, null); - HttpRequest partial = new HttpRequest("POST", "/", null, null, null); - HttpRequest request = new HttpRequest("POST", "/", null, null, null); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, null); + HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); + HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); + HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); + HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); |