diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-06-25 14:16:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-25 14:16:17 +0200 |
commit | b03e7869d363e46f1ff19a0e5365f830c638dc5d (patch) | |
tree | b542ea09958b0295f5304a2242a1a838c9a6a932 /vespa-feed-client/src/test/java | |
parent | 5896921f79fa9cf5dd5ea6ad6c4b78e392ca43e5 (diff) | |
parent | 11a6745c68f2ae199bedd7aa06a13ea52e3dc8c1 (diff) |
Merge pull request #18408 from vespa-engine/jonmv/vespa-feed-client
Jonmv/vespa feed client
Diffstat (limited to 'vespa-feed-client/src/test/java')
4 files changed, 105 insertions, 29 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index d8090549420..6aa0de2160c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -13,6 +13,7 @@ import java.util.function.BiFunction; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author jonmv @@ -55,16 +56,19 @@ class HttpFeedClientTest { return failed; } }); - Result result = client.put(id, - "json", - OperationParameters.empty() - .createIfNonExistent(true) - .testAndSetCondition("false") - .route("route") - .timeout(Duration.ofSeconds(5))) - .get(); - assertEquals("Ooops! ... I did it again.", result.resultMessage().get()); - assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get()); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty() + .createIfNonExistent(true) + .testAndSetCondition("false") + .route("route") + .timeout(Duration.ofSeconds(5))) + .get()); + assertTrue(expected.getCause() instanceof ResultException); + ResultException result = (ResultException) expected.getCause(); + assertEquals("Ooops! ... I did it again.", result.getMessage()); + assertEquals("I played with your heart. Got lost in the game.", result.getTrace().get()); // Handler error is a FeedException. @@ -90,11 +94,11 @@ class HttpFeedClientTest { return failed; } }); - ExecutionException expected = assertThrows(ExecutionException.class, - () -> client.put(id, - "json", - OperationParameters.empty()) - .get()); + expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty()) + .get()); assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 21ab6889e6e..02175150fed 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -27,7 +28,9 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @@ -46,11 +49,8 @@ class HttpRequestStrategyTest { CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { - while ( ! latch.await(1, TimeUnit.SECONDS)) { + while ( ! latch.await(1, TimeUnit.SECONDS)) System.err.println(cluster.stats().inflight()); - System.err.println(strategy.throttler.targetInflight()); - System.err.println(); - } } catch (InterruptedException ignored) { } }).start(); @@ -77,7 +77,7 @@ class HttpRequestStrategyTest { } @Test - void testLogic() throws ExecutionException, InterruptedException { + void testRetries() throws ExecutionException, InterruptedException { int minStreams = 16; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); @@ -97,10 +97,11 @@ class HttpRequestStrategyTest { HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. - cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom"))); + cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertEquals("boom", expected.getCause().getMessage()); + assertTrue(expected.getCause() instanceof FeedException); + assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. @@ -177,6 +178,81 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().exceptions()); } + @Test + void testShutdown() { + MockCluster cluster = new MockCluster(); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + new BenchmarkingCluster(cluster)); + + DocumentId id1 = DocumentId.of("ns", "type", "1"); + DocumentId id2 = DocumentId.of("ns", "type", "2"); + DocumentId id3 = DocumentId.of("ns", "type", "3"); + DocumentId id4 = DocumentId.of("ns", "type", "4"); + HttpRequest failing = new HttpRequest("POST", "/", null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null); + + // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. + Phaser phaser = new Phaser(2); + Phaser blocker = new Phaser(2); + AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>(); + cluster.expect((req, vessel) -> { + if (req == blocking) { + phaser.arriveAndAwaitAdvance(); // Synchronise with tst main thread, and then ... + blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue. + throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything. + } + else if (req == failing) { + phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this. + vessel.completeExceptionally(new IOException("failed")); + } + else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this. + }); + CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> failed = strategy.enqueue(id2, failing); + CompletableFuture<HttpResponse> blocked = strategy.enqueue(id3, blocking); + CompletableFuture<HttpResponse> delayed = strategy.enqueue(id4, request); + phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response. + phaser.arriveAndAwaitAdvance(); // failed completes dispatch, and a retry is enqueued. + phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. + + // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it; + // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch; + // failed has a partial result, and has a retry in the dispatch queue. + assertFalse(inflight.isDone()); + assertFalse(serialised1.isDone()); + assertFalse(serialised2.isDone()); + assertFalse(failed.isDone()); + assertFalse(blocked.isDone()); + assertFalse(delayed.isDone()); + + // Kill dispatch thread, and see that all enqueued operations, and new ones, complete. + blocker.arriveAndAwaitAdvance(); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, inflight::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised1::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised2::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, blocked::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, delayed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", + assertThrows(ExecutionException.class, failed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); + } + static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java index 1e616f2625a..3b633c38132 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -38,17 +38,15 @@ class JsonFileFeederExample implements Closeable { resultsReceived.incrementAndGet(); if (error != null) { log.warning("Problems with feeding document " - + error.documentId().map(DocumentId::toString).orElse("<unknown>")); - errorsReceived.incrementAndGet(); - } else if (result.type() == Result.Type.failure) { - log.warning("Problems with docID " + result.documentId() + ":" + error); + + error.documentId().map(DocumentId::toString).orElse("<unknown>") + + ": " + error); errorsReceived.incrementAndGet(); } } @Override public void onError(FeedException error) { - log.severe("Feeding failed for d: " + error.getMessage()); + log.severe("Feeding failed fatally: " + error.getMessage()); } @Override diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java index 5cee776b244..cbe0e213907 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -100,8 +100,6 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable { if (throwable != null) { System.err.printf("Failure for '%s': %s", docId, throwable); throwable.printStackTrace(); - } else if (result.type() == Result.Type.failure) { - System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("<no messsage>")); } }); } catch (InterruptedException e) { |