aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java282
1 files changed, 282 insertions, 0 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
new file mode 100644
index 00000000000..d293abf4f3e
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -0,0 +1,282 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.HttpResponse;
+import ai.vespa.feed.client.OperationStats;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class HttpRequestStrategyTest {
+
+ @Test
+ void testConcurrency() {
+ int documents = 1 << 16;
+ HttpRequest request = new HttpRequest("PUT", "/", null, null);
+ HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS));
+
+ HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
+ cluster);
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ while ( ! latch.await(1, TimeUnit.SECONDS))
+ System.err.println(cluster.stats().inflight());
+ }
+ catch (InterruptedException ignored) { }
+ }).start();
+ long startNanos = System.nanoTime();
+ for (int i = 0; i < documents; i++)
+ strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request);
+
+ strategy.await();
+ latch.countDown();
+ executor.shutdown();
+ cluster.close();
+ OperationStats stats = cluster.stats();
+ long successes = stats.responsesByCode().get(200);
+ System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ System.err.println(stats);
+
+ assertEquals(documents, stats.requests());
+ assertEquals(documents, stats.responses());
+ assertEquals(documents, stats.responsesByCode().get(200));
+ assertEquals(0, stats.inflight());
+ assertEquals(0, stats.exceptions());
+ assertEquals(0, stats.bytesSent());
+ assertEquals(2 * documents, stats.bytesReceived());
+ }
+
+ @Test
+ void testRetries() throws ExecutionException, InterruptedException {
+ int minStreams = 16; // Hard limit for minimum number of streams per connection.
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
+ new BenchmarkingCluster(cluster));
+ OperationStats initial = strategy.stats();
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
+
+ // Runtime exception is not retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertTrue(expected.getCause() instanceof FeedException);
+ assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
+ assertEquals(1, strategy.stats().requests());
+
+ // IOException is retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
+ expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("retry me", expected.getCause().getCause().getMessage());
+ assertEquals(3, strategy.stats().requests());
+
+ // Successful response is returned
+ HttpResponse success = HttpResponse.of(200, null);
+ cluster.expect((__, vessel) -> vessel.complete(success));
+ assertEquals(success, strategy.enqueue(id1, request).get());
+ assertEquals(4, strategy.stats().requests());
+
+ // Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
+ now.set(2000);
+ HttpResponse throttled = HttpResponse.of(429, null);
+ AtomicInteger count = new AtomicInteger(3);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
+ cluster.expect((req, vessel) -> {
+ if (req == request) {
+ if (count.decrementAndGet() > 0)
+ vessel.complete(throttled);
+ else {
+ completion.set(vessel);
+ latch.countDown();
+ }
+ }
+ else vessel.complete(success);
+ });
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null)).get());
+ latch.await();
+ assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
+ now.set(4000);
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+ completion.get().complete(success);
+ assertEquals(success, delayed.get());
+ assertEquals(success, serialised.get());
+
+ // Some error responses are retried.
+ HttpResponse serverError = HttpResponse.of(500, null);
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, request).get());
+ assertEquals(11, strategy.stats().requests());
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+
+ // Error responses are not retried when not of appropriate type.
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null)).get());
+ assertEquals(12, strategy.stats().requests());
+
+ // Some error responses are not retried.
+ HttpResponse badRequest = HttpResponse.of(400, null);
+ cluster.expect((__, vessel) -> vessel.complete(badRequest));
+ assertEquals(badRequest, strategy.enqueue(id1, request).get());
+ assertEquals(13, strategy.stats().requests());
+
+ // Circuit breaker opens some time after starting to fail.
+ now.set(6000);
+ assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
+ now.set(605000);
+ assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
+
+ Map<Integer, Long> codes = new HashMap<>();
+ codes.put(200, 4L);
+ codes.put(400, 1L);
+ codes.put(429, 2L);
+ codes.put(500, 3L);
+ assertEquals(codes, strategy.stats().responsesByCode());
+ assertEquals(3, strategy.stats().exceptions());
+
+ assertEquals(strategy.stats(), strategy.stats().since(initial));
+ assertEquals(0, strategy.stats().since(strategy.stats()).averageLatencyMillis());
+ assertEquals(0, strategy.stats().since(strategy.stats()).requests());
+ assertEquals(0, strategy.stats().since(strategy.stats()).bytesSent());
+ }
+
+ @Test
+ void testShutdown() {
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123")))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ new BenchmarkingCluster(cluster));
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ DocumentId id3 = DocumentId.of("ns", "type", "3");
+ DocumentId id4 = DocumentId.of("ns", "type", "4");
+ HttpRequest failing = new HttpRequest("POST", "/", null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null);
+
+ // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
+ Phaser phaser = new Phaser(2);
+ Phaser blocker = new Phaser(2);
+ cluster.expect((req, vessel) -> {
+ if (req == blocking) {
+ phaser.arriveAndAwaitAdvance(); // Synchronise with test main thread, and then ...
+ blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue.
+ throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything.
+ }
+ else if (req == failing) {
+ phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this.
+ vessel.completeExceptionally(new IOException("failed"));
+ phaser.arriveAndAwaitAdvance(); // Ensure a retry is scheduled before test thread is allowed to continue.
+ }
+ else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this.
+ });
+ CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> failed = strategy.enqueue(id2, failing);
+ CompletableFuture<HttpResponse> blocked = strategy.enqueue(id3, blocking);
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id4, request);
+ phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response.
+ phaser.arriveAndAwaitAdvance(); // failed is allowed to dispatch ...
+ phaser.arriveAndAwaitAdvance(); // ... and a retry is enqueued.
+ phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
+
+ // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it;
+ // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch;
+ // failed has a partial result, and has a retry in the dispatch queue.
+ assertFalse(inflight.isDone());
+ assertFalse(serialised1.isDone());
+ assertFalse(serialised2.isDone());
+ assertFalse(failed.isDone());
+ assertFalse(blocked.isDone());
+ assertFalse(delayed.isDone());
+
+ // Kill dispatch thread, and see that all enqueued operations, and new ones, complete.
+ blocker.arriveAndAwaitAdvance();
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, inflight::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised1::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised2::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, blocked::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, delayed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
+ assertThrows(ExecutionException.class, failed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());
+ }
+
+ static class MockCluster implements Cluster {
+
+ final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+
+ void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
+ dispatch.set(expected);
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ dispatch.get().accept(request, vessel);
+ }
+
+ }
+
+}