summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java88
1 files changed, 35 insertions, 53 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b9ab4e481ac..b1a04ac9ed4 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -42,14 +42,14 @@ class HttpRequestStrategyTest {
@Test
void testConcurrency() {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
+ HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
- HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setConnectionsPerEndpoint(1 << 10)
- .setMaxStreamPerConnection(1 << 12),
+ HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
@@ -81,40 +81,37 @@ class HttpRequestStrategyTest {
assertEquals(2 * documents, stats.bytesReceived());
}
- @Test()
+ @Test
void testRetries() throws ExecutionException, InterruptedException {
- int minStreams = 2; // Hard limit for minimum number of streams per connection.
+ int minStreams = 16; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
- AtomicLong nowNanos = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setRetryStrategy(new FeedClient.RetryStrategy() {
- @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
- @Override public int retries() { return 1; }
- })
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1)
- .setMaxStreamPerConnection(minStreams),
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertInstanceOf(FeedException.class, expected.getCause());
- assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage());
+ assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
// IOException is retried.
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(200_000_000L); // Exceed grace period.
- vessel.completeExceptionally(new IOException("retry me"));
- });
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
@@ -127,7 +124,7 @@ class HttpRequestStrategyTest {
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
- nowNanos.set(2_000_000_000L);
+ now.set(2000);
HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
@@ -144,11 +141,11 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
- nowNanos.set(4_000_000_000L);
+ now.set(4000);
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
completion.get().complete(success);
assertEquals(success, delayed.get());
@@ -156,17 +153,14 @@ class HttpRequestStrategyTest {
// Some error responses are retried.
HttpResponse serverError = HttpResponse.of(503, null);
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(200_000_000L); // Exceed grace period.
- vessel.complete(serverError);
- });
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -175,22 +169,10 @@ class HttpRequestStrategyTest {
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
-
- // IOException is not retried past timeout.
- cluster.expect((__, vessel) -> {
- nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts.
- vessel.completeExceptionally(new IOException("retry me"));
- });
- expected = assertThrows(ExecutionException.class,
- () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get());
- assertEquals("retry me", expected.getCause().getCause().getMessage());
- assertEquals(15, strategy.stats().requests());
-
-
// Circuit breaker opens some time after starting to fail.
- nowNanos.set(6_000_000_000L);
+ now.set(6000);
assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
- nowNanos.set(605_000_000_000L);
+ now.set(605000);
assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
strategy.destroy();
@@ -201,7 +183,7 @@ class HttpRequestStrategyTest {
codes.put(429, 2L);
codes.put(503, 3L);
assertEquals(codes, stats.responsesByCode());
- assertEquals(5, stats.exceptions());
+ assertEquals(3, stats.exceptions());
assertEquals(stats, stats.since(initial));
assertEquals(0, stats.since(stats).averageLatencyMillis());
@@ -212,8 +194,8 @@ class HttpRequestStrategyTest {
@Test
void testShutdown() {
MockCluster cluster = new MockCluster();
- AtomicLong nowNanos = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public int retries() { return 1; }
@@ -227,10 +209,10 @@ class HttpRequestStrategyTest {
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest partial = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);
@@ -261,7 +243,7 @@ class HttpRequestStrategyTest {
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request);
phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async.
// failed immediately fails, and lets us assert the above retry is indeed enqueued.
- assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal",
+ assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal",
assertThrows(ExecutionException.class, failed::get).getMessage());
phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
@@ -288,7 +270,7 @@ class HttpRequestStrategyTest {
assertThrows(ExecutionException.class, blocked::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, delayed::get).getMessage());
- assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed",
+ assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
assertThrows(ExecutionException.class, retried::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());