From 19d3c4448090576bc37f611794c5ee5799cde9f5 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Tue, 21 May 2024 17:09:02 +0200 Subject: Revert "Retry requests within retry count limit OR grace period (default 10s)" --- vespa-feed-client-api/abi-spec.json | 3 +- .../main/java/ai/vespa/feed/client/FeedClient.java | 5 +- .../java/ai/vespa/feed/client/FeedException.java | 5 -- .../feed/client/impl/FeedClientBuilderImpl.java | 5 +- .../client/impl/GracePeriodCircuitBreaker.java | 41 +++++----- .../ai/vespa/feed/client/impl/HttpFeedClient.java | 9 +-- .../ai/vespa/feed/client/impl/HttpRequest.java | 11 +-- .../feed/client/impl/HttpRequestStrategy.java | 11 ++- .../ai/vespa/feed/client/impl/JettyCluster.java | 8 +- .../client/impl/GracePeriodCircuitBreakerTest.java | 18 ++--- .../vespa/feed/client/impl/HttpFeedClientTest.java | 6 +- .../feed/client/impl/HttpRequestStrategyTest.java | 88 +++++++++------------- 12 files changed, 84 insertions(+), 126 deletions(-) diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json index 7ac4a7619ed..9065edad92a 100644 --- a/vespa-feed-client-api/abi-spec.json +++ b/vespa-feed-client-api/abi-spec.json @@ -186,8 +186,7 @@ "public void (java.lang.Throwable)", "public void (ai.vespa.feed.client.DocumentId, java.lang.Throwable)", "public void (ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)", - "public java.util.Optional documentId()", - "public java.lang.String getMessage()" + "public java.util.Optional documentId()" ], "fields" : [ ] }, diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java index 7de7aae1350..d73d36e0f4e 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -2,7 +2,6 @@ package ai.vespa.feed.client; import java.io.Closeable; -import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -75,8 +74,8 @@ public interface FeedClient extends Closeable { /** Whether to retry operations of the given type. */ default boolean retry(OperationType type) { return true; } - /** Maximum number of retries per operation for assumed transient, non-backpressure problems. */ - default int retries() { return Integer.MAX_VALUE; } + /** Number of retries per operation for assumed transient, non-backpressure problems. */ + default int retries() { return 10; } } diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java index dd1327f7ccf..74f906149b2 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java @@ -44,9 +44,4 @@ public class FeedException extends RuntimeException { public Optional documentId() { return Optional.ofNullable(documentId); } - @Override - public String getMessage() { - return documentId != null ? "(" + documentId + ") " + super.getMessage() : super.getMessage(); - } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 424481b6ef2..c271ac356e9 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -18,7 +18,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; @@ -59,7 +58,9 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { URI proxy; Duration connectionTtl = Duration.ZERO; - public FeedClientBuilderImpl() { } + + public FeedClientBuilderImpl() { + } FeedClientBuilderImpl(List endpoints) { this(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index cec7106403e..1ea2089c0eb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -12,6 +12,7 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; @@ -23,22 +24,22 @@ import static java.util.logging.Level.WARNING; public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); + private static final long NEVER = 1L << 60; + private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); private final AtomicBoolean halfOpen = new AtomicBoolean(false); private final AtomicBoolean open = new AtomicBoolean(false); - private final LongSupplier nanoClock; - private final long never; - private final AtomicLong failingSinceNanos; + private final LongSupplier clock; private final AtomicReference detail = new AtomicReference<>(); - private final long graceNanos; - private final long doomNanos; + private final long graceMillis; + private final long doomMillis; /** * Creates a new circuit breaker with the given grace periods. * @param grace the period of consecutive failures before state changes to half-open. */ public GracePeriodCircuitBreaker(Duration grace) { - this(System::nanoTime, grace, null); + this(System::currentTimeMillis, grace, null); } /** @@ -47,25 +48,23 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { * @param doom the period of consecutive failures before shutting down. */ public GracePeriodCircuitBreaker(Duration grace, Duration doom) { - this(System::nanoTime, grace, doom); + this(System::currentTimeMillis, grace, doom); if (doom.isNegative()) throw new IllegalArgumentException("Doom delay must be non-negative"); } - GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) { + GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { if (grace.isNegative()) throw new IllegalArgumentException("Grace delay must be non-negative"); - this.nanoClock = requireNonNull(nanoClock); - this.never = nanoClock.getAsLong() + (1L << 60); - this.graceNanos = grace.toNanos(); - this.doomNanos = doom == null ? -1 : doom.toNanos(); - this.failingSinceNanos = new AtomicLong(never); + this.clock = requireNonNull(clock); + this.graceMillis = grace.toMillis(); + this.doomMillis = doom == null ? -1 : doom.toMillis(); } @Override public void success() { - failingSinceNanos.set(never); + failingSinceMillis.set(NEVER); if ( ! open.get() && halfOpen.compareAndSet(true, false)) log.log(INFO, "Circuit breaker is now closed, after a request was successful"); } @@ -81,21 +80,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { } private void failure(String detail) { - if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong())) + if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong())) this.detail.set(detail); } @Override public State state() { - long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get(); - if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true)) + long failingMillis = clock.getAsLong() - failingSinceMillis.get(); + if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " + - "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" + - (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") + + "last " + failingMillis + "ms. The server will be pinged to see if it recovers" + + (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") + ". First failure was '" + detail.get() + "'."); - if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true)) - log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " + + if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " + "and this client will give up and abort its remaining feed operations."); return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 5eb611160cc..9dd11113c0b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -30,7 +31,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.OperationParameters.empty; @@ -45,7 +45,6 @@ import static java.util.Objects.requireNonNull; */ class HttpFeedClient implements FeedClient { - private static final Duration maxTimeout = Duration.ofMinutes(15); private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder() .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()) .build(); @@ -112,8 +111,7 @@ class HttpFeedClient implements FeedClient { getPath(documentId) + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? - params.timeout().orElse(maxTimeout), - System::nanoTime); + params.timeout().orElse(null)); CompletableFuture promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -138,8 +136,7 @@ class HttpFeedClient implements FeedClient { getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, null, - Duration.ofSeconds(15), - System::nanoTime); + Duration.ofSeconds(15)); CompletableFuture future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index fdd35b74c35..6de3f034f22 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -3,7 +3,6 @@ package ai.vespa.feed.client.impl; import java.time.Duration; import java.util.Map; -import java.util.function.LongSupplier; import java.util.function.Supplier; class HttpRequest { @@ -13,17 +12,13 @@ class HttpRequest { private final Map> headers; private final byte[] body; private final Duration timeout; - private final long deadlineNanos; - private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { + public HttpRequest(String method, String path, Map> headers, byte[] body, Duration timeout) { this.method = method; this.path = path; this.headers = headers; this.body = body; - this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); this.timeout = timeout; - this.nanoClock = nanoClock; } public String method() { @@ -42,10 +37,6 @@ class HttpRequest { return body; } - public Duration timeLeft() { - return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong()); - } - public Duration timeout() { return timeout; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index f699651634a..8f0327a1738 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -31,7 +31,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; -import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; /** @@ -97,7 +96,7 @@ class HttpRequestStrategy implements RequestStrategy { } } catch (Throwable t) { - log.log(SEVERE, "Dispatch thread threw; shutting down", t); + log.log(WARNING, "Dispatch thread threw; shutting down", t); } destroy(); } @@ -120,7 +119,7 @@ class HttpRequestStrategy implements RequestStrategy { } private boolean retry(HttpRequest request, int attempt) { - if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0) + if (attempt > strategy.retries()) return false; switch (request.method().toUpperCase()) { @@ -138,6 +137,7 @@ class HttpRequestStrategy implements RequestStrategy { private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(thrown); if ( (thrown instanceof IOException) // General IO problems. + // Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed")) ) { @@ -149,7 +149,7 @@ class HttpRequestStrategy implements RequestStrategy { return false; } - /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */ + /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */ private boolean retry(HttpRequest request, HttpResponse response, int attempt) { if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) { logResponse(FINEST, response, request, attempt); @@ -272,8 +272,7 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture vessel, HttpRequest request, - RetriableFuture result, int attempt) { + private void handleAttempt(CompletableFuture vessel, HttpRequest request, RetriableFuture result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { result.set(response, thrown); // Retry the operation if it failed with a transient error ... diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 18369f29f0b..df010a167f6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -45,7 +45,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -82,11 +81,8 @@ class JettyCluster implements Cluster { Endpoint endpoint = findLeastBusyEndpoint(endpoints); try { endpoint.inflight.incrementAndGet(); - long reqTimeoutMillis = req.timeLeft().toMillis(); - if (reqTimeoutMillis <= 0) { - vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'")); - return; - } + long reqTimeoutMillis = req.timeout() != null + ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java index 52b8dcc5884..f5ca70fe291 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java @@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest { @Test void testCircuitBreaker() { - AtomicLong nowNanos = new AtomicLong(0); - long SECOND = 1_000_000_000L; - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + AtomicLong now = new AtomicLong(0); + long SECOND = 1000; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); Throwable error = new Error(); assertEquals(CLOSED, breaker.state(), "Initial state is closed"); - nowNanos.addAndGet(100 * SECOND); + now.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a success"); - nowNanos.addAndGet(100 * SECOND); + now.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); breaker.failure(error); assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); - nowNanos.addAndGet(SECOND); + now.addAndGet(SECOND); assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); - nowNanos.addAndGet(1); + now.addAndGet(1); assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); breaker.failure(error); - nowNanos.addAndGet(60 * SECOND); + now.addAndGet(60 * SECOND); assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); - nowNanos.addAndGet(1); + now.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); breaker.success(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 14ade35825f..28bde16f457 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -174,7 +174,7 @@ class HttpFeedClientTest { .timeout(Duration.ofSeconds(5))) .get()); assertTrue(expected.getCause() instanceof ResultException); - assertEquals("(id:ns:type::0) Ooops! ... I did it again.", expected.getCause().getMessage()); + assertEquals("Ooops! ... I did it again.", expected.getCause().getMessage()); assertEquals("[ { \"message\": \"I played with your heart. Got lost in the game.\" } ]", ((ResultException) expected.getCause()).getTrace().get()); @@ -207,14 +207,14 @@ class HttpFeedClientTest { "json", OperationParameters.empty()) .get()); - assertEquals("(id:ns:type::0) Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); + assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } @Test void testHandshake() { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); String message = exception.getMessage(); assertTrue(message.startsWith("failed handshake with server after "), message); assertTrue(message.contains("java.net.UnknownHostException"), message); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b9ab4e481ac..b1a04ac9ed4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -42,14 +42,14 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); + HttpRequest request = new HttpRequest("PUT", "/", null, null, null); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setConnectionsPerEndpoint(1 << 10) - .setMaxStreamPerConnection(1 << 12), + HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { @@ -81,40 +81,37 @@ class HttpRequestStrategyTest { assertEquals(2 * documents, stats.bytesReceived()); } - @Test() + @Test void testRetries() throws ExecutionException, InterruptedException { - int minStreams = 2; // Hard limit for minimum number of streams per connection. + int minStreams = 16; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); - AtomicLong nowNanos = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setRetryStrategy(new FeedClient.RetryStrategy() { - @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } - @Override public int retries() { return 1; } - }) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1) - .setMaxStreamPerConnection(minStreams), + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams), cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertInstanceOf(FeedException.class, expected.getCause()); - assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage()); + assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(200_000_000L); // Exceed grace period. - vessel.completeExceptionally(new IOException("retry me")); - }); + cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); @@ -127,7 +124,7 @@ class HttpRequestStrategyTest { assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. - nowNanos.set(2_000_000_000L); + now.set(2000); HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); @@ -144,11 +141,11 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture delayed = strategy.enqueue(id1, request); - CompletableFuture serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + CompletableFuture serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. - nowNanos.set(4_000_000_000L); + now.set(4000); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. completion.get().complete(success); assertEquals(success, delayed.get()); @@ -156,17 +153,14 @@ class HttpRequestStrategyTest { // Some error responses are retried. HttpResponse serverError = HttpResponse.of(503, null); - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(200_000_000L); // Exceed grace period. - vessel.complete(serverError); - }); + cluster.expect((__, vessel) -> vessel.complete(serverError)); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -175,22 +169,10 @@ class HttpRequestStrategyTest { assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); - - // IOException is not retried past timeout. - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts. - vessel.completeExceptionally(new IOException("retry me")); - }); - expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get()); - assertEquals("retry me", expected.getCause().getCause().getMessage()); - assertEquals(15, strategy.stats().requests()); - - // Circuit breaker opens some time after starting to fail. - nowNanos.set(6_000_000_000L); + now.set(6000); assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. - nowNanos.set(605_000_000_000L); + now.set(605000); assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. strategy.destroy(); @@ -201,7 +183,7 @@ class HttpRequestStrategyTest { codes.put(429, 2L); codes.put(503, 3L); assertEquals(codes, stats.responsesByCode()); - assertEquals(5, stats.exceptions()); + assertEquals(3, stats.exceptions()); assertEquals(stats, stats.since(initial)); assertEquals(0, stats.since(stats).averageLatencyMillis()); @@ -212,8 +194,8 @@ class HttpRequestStrategyTest { @Test void testShutdown() { MockCluster cluster = new MockCluster(); - AtomicLong nowNanos = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } @@ -227,10 +209,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest failing = new HttpRequest("POST", "/", null, null, null); + HttpRequest partial = new HttpRequest("POST", "/", null, null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null, null); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); @@ -261,7 +243,7 @@ class HttpRequestStrategyTest { CompletableFuture delayed = strategy.enqueue(id5, request); phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async. // failed immediately fails, and lets us assert the above retry is indeed enqueued. - assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal", + assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal", assertThrows(ExecutionException.class, failed::get).getMessage()); phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. @@ -288,7 +270,7 @@ class HttpRequestStrategyTest { assertThrows(ExecutionException.class, blocked::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, delayed::get).getMessage()); - assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed", + assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", assertThrows(ExecutionException.class, retried::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); -- cgit v1.2.3