aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2024-05-21 17:09:25 +0200
committerGitHub <noreply@github.com>2024-05-21 17:09:25 +0200
commitec16b9b22481abf0959720fdec8a67c74bdce475 (patch)
tree7895e6fee91d4c3728feb664c7ee4b3387eb5138
parent3d65e159d42ff317ca52107960c19af1a431afd1 (diff)
parent19d3c4448090576bc37f611794c5ee5799cde9f5 (diff)
Merge pull request #31266 from vespa-engine/revert-31228-jonmv/feed-retry-grace-periodv8.345.20
Revert "Retry requests within retry count limit OR grace period (default 10s)" MERGEOK
-rw-r--r--vespa-feed-client-api/abi-spec.json3
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java5
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java41
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java9
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java8
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java18
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java6
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java88
12 files changed, 84 insertions, 126 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json
index 7ac4a7619ed..9065edad92a 100644
--- a/vespa-feed-client-api/abi-spec.json
+++ b/vespa-feed-client-api/abi-spec.json
@@ -186,8 +186,7 @@
"public void <init>(java.lang.Throwable)",
"public void <init>(ai.vespa.feed.client.DocumentId, java.lang.Throwable)",
"public void <init>(ai.vespa.feed.client.DocumentId, java.lang.String, java.lang.Throwable)",
- "public java.util.Optional documentId()",
- "public java.lang.String getMessage()"
+ "public java.util.Optional documentId()"
],
"fields" : [ ]
},
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
index 7de7aae1350..d73d36e0f4e 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -2,7 +2,6 @@
package ai.vespa.feed.client;
import java.io.Closeable;
-import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -75,8 +74,8 @@ public interface FeedClient extends Closeable {
/** Whether to retry operations of the given type. */
default boolean retry(OperationType type) { return true; }
- /** Maximum number of retries per operation for assumed transient, non-backpressure problems. */
- default int retries() { return Integer.MAX_VALUE; }
+ /** Number of retries per operation for assumed transient, non-backpressure problems. */
+ default int retries() { return 10; }
}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
index dd1327f7ccf..74f906149b2 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
@@ -44,9 +44,4 @@ public class FeedException extends RuntimeException {
public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); }
- @Override
- public String getMessage() {
- return documentId != null ? "(" + documentId + ") " + super.getMessage() : super.getMessage();
- }
-
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index 424481b6ef2..c271ac356e9 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -18,7 +18,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto;
@@ -59,7 +58,9 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
URI proxy;
Duration connectionTtl = Duration.ZERO;
- public FeedClientBuilderImpl() { }
+
+ public FeedClientBuilderImpl() {
+ }
FeedClientBuilderImpl(List<URI> endpoints) {
this();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
index cec7106403e..1ea2089c0eb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
@@ -12,6 +12,7 @@ import java.util.function.LongSupplier;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@@ -23,22 +24,22 @@ import static java.util.logging.Level.WARNING;
public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
+ private static final long NEVER = 1L << 60;
+ private final AtomicLong failingSinceMillis = new AtomicLong(NEVER);
private final AtomicBoolean halfOpen = new AtomicBoolean(false);
private final AtomicBoolean open = new AtomicBoolean(false);
- private final LongSupplier nanoClock;
- private final long never;
- private final AtomicLong failingSinceNanos;
+ private final LongSupplier clock;
private final AtomicReference<String> detail = new AtomicReference<>();
- private final long graceNanos;
- private final long doomNanos;
+ private final long graceMillis;
+ private final long doomMillis;
/**
* Creates a new circuit breaker with the given grace periods.
* @param grace the period of consecutive failures before state changes to half-open.
*/
public GracePeriodCircuitBreaker(Duration grace) {
- this(System::nanoTime, grace, null);
+ this(System::currentTimeMillis, grace, null);
}
/**
@@ -47,25 +48,23 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
* @param doom the period of consecutive failures before shutting down.
*/
public GracePeriodCircuitBreaker(Duration grace, Duration doom) {
- this(System::nanoTime, grace, doom);
+ this(System::currentTimeMillis, grace, doom);
if (doom.isNegative())
throw new IllegalArgumentException("Doom delay must be non-negative");
}
- GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) {
+ GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) {
if (grace.isNegative())
throw new IllegalArgumentException("Grace delay must be non-negative");
- this.nanoClock = requireNonNull(nanoClock);
- this.never = nanoClock.getAsLong() + (1L << 60);
- this.graceNanos = grace.toNanos();
- this.doomNanos = doom == null ? -1 : doom.toNanos();
- this.failingSinceNanos = new AtomicLong(never);
+ this.clock = requireNonNull(clock);
+ this.graceMillis = grace.toMillis();
+ this.doomMillis = doom == null ? -1 : doom.toMillis();
}
@Override
public void success() {
- failingSinceNanos.set(never);
+ failingSinceMillis.set(NEVER);
if ( ! open.get() && halfOpen.compareAndSet(true, false))
log.log(INFO, "Circuit breaker is now closed, after a request was successful");
}
@@ -81,21 +80,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
}
private void failure(String detail) {
- if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong()))
+ if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong()))
this.detail.set(detail);
}
@Override
public State state() {
- long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get();
- if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true))
+ long failingMillis = clock.getAsLong() - failingSinceMillis.get();
+ if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " +
- "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" +
- (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") +
+ "last " + failingMillis + "ms. The server will be pinged to see if it recovers" +
+ (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") +
". First failure was '" + detail.get() + "'.");
- if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true))
- log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " +
+ if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " +
"and this client will give up and abort its remaining feed operations.");
return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 5eb611160cc..9dd11113c0b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -30,7 +31,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.OperationParameters.empty;
@@ -45,7 +45,6 @@ import static java.util.Objects.requireNonNull;
*/
class HttpFeedClient implements FeedClient {
- private static final Duration maxTimeout = Duration.ofMinutes(15);
private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder()
.streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
.build();
@@ -112,8 +111,7 @@ class HttpFeedClient implements FeedClient {
getPath(documentId) + getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
- params.timeout().orElse(maxTimeout),
- System::nanoTime);
+ params.timeout().orElse(null));
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
@@ -138,8 +136,7 @@ class HttpFeedClient implements FeedClient {
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
null,
- Duration.ofSeconds(15),
- System::nanoTime);
+ Duration.ofSeconds(15));
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index fdd35b74c35..6de3f034f22 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -3,7 +3,6 @@ package ai.vespa.feed.client.impl;
import java.time.Duration;
import java.util.Map;
-import java.util.function.LongSupplier;
import java.util.function.Supplier;
class HttpRequest {
@@ -13,17 +12,13 @@ class HttpRequest {
private final Map<String, Supplier<String>> headers;
private final byte[] body;
private final Duration timeout;
- private final long deadlineNanos;
- private final LongSupplier nanoClock;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) {
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) {
this.method = method;
this.path = path;
this.headers = headers;
this.body = body;
- this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos();
this.timeout = timeout;
- this.nanoClock = nanoClock;
}
public String method() {
@@ -42,10 +37,6 @@ class HttpRequest {
return body;
}
- public Duration timeLeft() {
- return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong());
- }
-
public Duration timeout() {
return timeout;
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index f699651634a..8f0327a1738 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -31,7 +31,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
-import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
/**
@@ -97,7 +96,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
catch (Throwable t) {
- log.log(SEVERE, "Dispatch thread threw; shutting down", t);
+ log.log(WARNING, "Dispatch thread threw; shutting down", t);
}
destroy();
}
@@ -120,7 +119,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
private boolean retry(HttpRequest request, int attempt) {
- if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0)
+ if (attempt > strategy.retries())
return false;
switch (request.method().toUpperCase()) {
@@ -138,6 +137,7 @@ class HttpRequestStrategy implements RequestStrategy {
private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
breaker.failure(thrown);
if ( (thrown instanceof IOException) // General IO problems.
+
// Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server
|| (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed"))
) {
@@ -149,7 +149,7 @@ class HttpRequestStrategy implements RequestStrategy {
return false;
}
- /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */
+ /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */
private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) {
logResponse(FINEST, response, request, attempt);
@@ -272,8 +272,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request,
- RetriableFuture<HttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index 18369f29f0b..df010a167f6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -45,7 +45,6 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -82,11 +81,8 @@ class JettyCluster implements Cluster {
Endpoint endpoint = findLeastBusyEndpoint(endpoints);
try {
endpoint.inflight.incrementAndGet();
- long reqTimeoutMillis = req.timeLeft().toMillis();
- if (reqTimeoutMillis <= 0) {
- vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'"));
- return;
- }
+ long reqTimeoutMillis = req.timeout() != null
+ ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
.version(HttpVersion.HTTP_2)
.method(HttpMethod.fromString(req.method()))
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
index 52b8dcc5884..f5ca70fe291 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
@@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest {
@Test
void testCircuitBreaker() {
- AtomicLong nowNanos = new AtomicLong(0);
- long SECOND = 1_000_000_000L;
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
+ AtomicLong now = new AtomicLong(0);
+ long SECOND = 1000;
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
Throwable error = new Error();
assertEquals(CLOSED, breaker.state(), "Initial state is closed");
- nowNanos.addAndGet(100 * SECOND);
+ now.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a success");
- nowNanos.addAndGet(100 * SECOND);
+ now.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed some time after a success");
breaker.failure(error);
assertEquals(CLOSED, breaker.state(), "State is closed right after a failure");
- nowNanos.addAndGet(SECOND);
+ now.addAndGet(SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed");
- nowNanos.addAndGet(1);
+ now.addAndGet(1);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a new success");
breaker.failure(error);
- nowNanos.addAndGet(60 * SECOND);
+ now.addAndGet(60 * SECOND);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed");
- nowNanos.addAndGet(1);
+ now.addAndGet(1);
assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");
breaker.success();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 14ade35825f..28bde16f457 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -174,7 +174,7 @@ class HttpFeedClientTest {
.timeout(Duration.ofSeconds(5)))
.get());
assertTrue(expected.getCause() instanceof ResultException);
- assertEquals("(id:ns:type::0) Ooops! ... I did it again.", expected.getCause().getMessage());
+ assertEquals("Ooops! ... I did it again.", expected.getCause().getMessage());
assertEquals("[ { \"message\": \"I played with your heart. Got lost in the game.\" } ]", ((ResultException) expected.getCause()).getTrace().get());
@@ -207,14 +207,14 @@ class HttpFeedClientTest {
"json",
OperationParameters.empty())
.get());
- assertEquals("(id:ns:type::0) Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
+ assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
}
@Test
void testHandshake() {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
- () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
+ () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
String message = exception.getMessage();
assertTrue(message.startsWith("failed handshake with server after "), message);
assertTrue(message.contains("java.net.UnknownHostException"), message);
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b9ab4e481ac..b1a04ac9ed4 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -42,14 +42,14 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
- HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setConnectionsPerEndpoint(1 << 10)
- .setMaxStreamPerConnection(1 << 12),
+ HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
@@ -81,40 +81,37 @@ class HttpRequestStrategyTest {
assertEquals(2 * documents, stats.bytesReceived());
}
- @Test()
+ @Test
void testRetries() throws ExecutionException, InterruptedException {
- int minStreams = 2; // Hard limit for minimum number of streams per connection.
+ int minStreams = 16; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
- AtomicLong nowNanos = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setRetryStrategy(new FeedClient.RetryStrategy() {
- @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
- @Override public int retries() { return 1; }
- })
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1)
- .setMaxStreamPerConnection(minStreams),
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertInstanceOf(FeedException.class, expected.getCause());
- assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage());
+ assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
// IOException is retried.
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(200_000_000L); // Exceed grace period.
- vessel.completeExceptionally(new IOException("retry me"));
- });
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
@@ -127,7 +124,7 @@ class HttpRequestStrategyTest {
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
- nowNanos.set(2_000_000_000L);
+ now.set(2000);
HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
@@ -144,11 +141,11 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
- nowNanos.set(4_000_000_000L);
+ now.set(4000);
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
completion.get().complete(success);
assertEquals(success, delayed.get());
@@ -156,17 +153,14 @@ class HttpRequestStrategyTest {
// Some error responses are retried.
HttpResponse serverError = HttpResponse.of(503, null);
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(200_000_000L); // Exceed grace period.
- vessel.complete(serverError);
- });
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -175,22 +169,10 @@ class HttpRequestStrategyTest {
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
-
- // IOException is not retried past timeout.
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts.
- vessel.completeExceptionally(new IOException("retry me"));
- });
- expected = assertThrows(ExecutionException.class,
- () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get());
- assertEquals("retry me", expected.getCause().getCause().getMessage());
- assertEquals(15, strategy.stats().requests());
-
-
// Circuit breaker opens some time after starting to fail.
- nowNanos.set(6_000_000_000L);
+ now.set(6000);
assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
- nowNanos.set(605_000_000_000L);
+ now.set(605000);
assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
strategy.destroy();
@@ -201,7 +183,7 @@ class HttpRequestStrategyTest {
codes.put(429, 2L);
codes.put(503, 3L);
assertEquals(codes, stats.responsesByCode());
- assertEquals(5, stats.exceptions());
+ assertEquals(3, stats.exceptions());
assertEquals(stats, stats.since(initial));
assertEquals(0, stats.since(stats).averageLatencyMillis());
@@ -212,8 +194,8 @@ class HttpRequestStrategyTest {
@Test
void testShutdown() {
MockCluster cluster = new MockCluster();
- AtomicLong nowNanos = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public int retries() { return 1; }
@@ -227,10 +209,10 @@ class HttpRequestStrategyTest {
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest partial = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);
@@ -261,7 +243,7 @@ class HttpRequestStrategyTest {
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request);
phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async.
// failed immediately fails, and lets us assert the above retry is indeed enqueued.
- assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal",
+ assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal",
assertThrows(ExecutionException.class, failed::get).getMessage());
phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
@@ -288,7 +270,7 @@ class HttpRequestStrategyTest {
assertThrows(ExecutionException.class, blocked::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, delayed::get).getMessage());
- assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed",
+ assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
assertThrows(ExecutionException.class, retried::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());