summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test/java
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-25 13:51:31 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-25 13:51:31 +0200
commitce27d7104be34f11e9d6966036b3bdfd7928dcc8 (patch)
tree9c3ec70aa249ab7d9e84208dec0bffbb2d324cec /vespa-feed-client/src/test/java
parente39a7eb17d4e7f1fad56fa9ff5873ba4eadbc523 (diff)
Propagate last retried result when aborting execution (and fix shutdown)
Diffstat (limited to 'vespa-feed-client/src/test/java')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java89
1 files changed, 82 insertions, 7 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 6b2087b1828..02175150fed 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -15,6 +15,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,6 +28,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -47,11 +49,8 @@ class HttpRequestStrategyTest {
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
- while ( ! latch.await(1, TimeUnit.SECONDS)) {
+ while ( ! latch.await(1, TimeUnit.SECONDS))
System.err.println(cluster.stats().inflight());
- System.err.println(strategy.throttler.targetInflight());
- System.err.println();
- }
}
catch (InterruptedException ignored) { }
}).start();
@@ -78,7 +77,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testLogic() throws ExecutionException, InterruptedException {
+ void testRetries() throws ExecutionException, InterruptedException {
int minStreams = 16; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
@@ -98,10 +97,11 @@ class HttpRequestStrategyTest {
HttpRequest request = new HttpRequest("POST", "/", null, null);
// Runtime exception is not retried.
- cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom")));
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertEquals("boom", expected.getCause().getMessage());
+ assertTrue(expected.getCause() instanceof FeedException);
+ assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
// IOException is retried.
@@ -178,6 +178,81 @@ class HttpRequestStrategyTest {
assertEquals(3, strategy.stats().exceptions());
}
+ @Test
+ void testShutdown() {
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ new BenchmarkingCluster(cluster));
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ DocumentId id3 = DocumentId.of("ns", "type", "3");
+ DocumentId id4 = DocumentId.of("ns", "type", "4");
+ HttpRequest failing = new HttpRequest("POST", "/", null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null);
+
+ // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
+ Phaser phaser = new Phaser(2);
+ Phaser blocker = new Phaser(2);
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
+ cluster.expect((req, vessel) -> {
+ if (req == blocking) {
+ phaser.arriveAndAwaitAdvance(); // Synchronise with tst main thread, and then ...
+ blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue.
+ throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything.
+ }
+ else if (req == failing) {
+ phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this.
+ vessel.completeExceptionally(new IOException("failed"));
+ }
+ else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this.
+ });
+ CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> failed = strategy.enqueue(id2, failing);
+ CompletableFuture<HttpResponse> blocked = strategy.enqueue(id3, blocking);
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id4, request);
+ phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response.
+ phaser.arriveAndAwaitAdvance(); // failed completes dispatch, and a retry is enqueued.
+ phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
+
+ // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it;
+ // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch;
+ // failed has a partial result, and has a retry in the dispatch queue.
+ assertFalse(inflight.isDone());
+ assertFalse(serialised1.isDone());
+ assertFalse(serialised2.isDone());
+ assertFalse(failed.isDone());
+ assertFalse(blocked.isDone());
+ assertFalse(delayed.isDone());
+
+ // Kill dispatch thread, and see that all enqueued operations, and new ones, complete.
+ blocker.arriveAndAwaitAdvance();
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, inflight::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised1::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised2::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, blocked::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, delayed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
+ assertThrows(ExecutionException.class, failed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());
+ }
+
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();