diff options
Diffstat (limited to 'vespa-feed-client/src/test/java/ai/vespa')
3 files changed, 47 insertions, 65 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java index 52b8dcc5884..f5ca70fe291 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java @@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest { @Test void testCircuitBreaker() { - AtomicLong nowNanos = new AtomicLong(0); - long SECOND = 1_000_000_000L; - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + AtomicLong now = new AtomicLong(0); + long SECOND = 1000; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); Throwable error = new Error(); assertEquals(CLOSED, breaker.state(), "Initial state is closed"); - nowNanos.addAndGet(100 * SECOND); + now.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a success"); - nowNanos.addAndGet(100 * SECOND); + now.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); breaker.failure(error); assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); - nowNanos.addAndGet(SECOND); + now.addAndGet(SECOND); assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); - nowNanos.addAndGet(1); + now.addAndGet(1); assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); breaker.failure(error); - nowNanos.addAndGet(60 * SECOND); + now.addAndGet(60 * SECOND); assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); - nowNanos.addAndGet(1); + now.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); breaker.success(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 14ade35825f..28bde16f457 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -174,7 +174,7 @@ class HttpFeedClientTest { .timeout(Duration.ofSeconds(5))) .get()); assertTrue(expected.getCause() instanceof ResultException); - assertEquals("(id:ns:type::0) Ooops! ... I did it again.", expected.getCause().getMessage()); + assertEquals("Ooops! ... I did it again.", expected.getCause().getMessage()); assertEquals("[ { \"message\": \"I played with your heart. Got lost in the game.\" } ]", ((ResultException) expected.getCause()).getTrace().get()); @@ -207,14 +207,14 @@ class HttpFeedClientTest { "json", OperationParameters.empty()) .get()); - assertEquals("(id:ns:type::0) Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); + assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } @Test void testHandshake() { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); String message = exception.getMessage(); assertTrue(message.startsWith("failed handshake with server after "), message); assertTrue(message.contains("java.net.UnknownHostException"), message); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b9ab4e481ac..b1a04ac9ed4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -42,14 +42,14 @@ class HttpRequestStrategyTest { @Test void testConcurrency() { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); + HttpRequest request = new HttpRequest("PUT", "/", null, null, null); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setConnectionsPerEndpoint(1 << 10) - .setMaxStreamPerConnection(1 << 12), + HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { @@ -81,40 +81,37 @@ class HttpRequestStrategyTest { assertEquals(2 * documents, stats.bytesReceived()); } - @Test() + @Test void testRetries() throws ExecutionException, InterruptedException { - int minStreams = 2; // Hard limit for minimum number of streams per connection. + int minStreams = 16; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); - AtomicLong nowNanos = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setRetryStrategy(new FeedClient.RetryStrategy() { - @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } - @Override public int retries() { return 1; } - }) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1) - .setMaxStreamPerConnection(minStreams), + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams), cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertInstanceOf(FeedException.class, expected.getCause()); - assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage()); + assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(200_000_000L); // Exceed grace period. - vessel.completeExceptionally(new IOException("retry me")); - }); + cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); @@ -127,7 +124,7 @@ class HttpRequestStrategyTest { assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. - nowNanos.set(2_000_000_000L); + now.set(2000); HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); @@ -144,11 +141,11 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. - nowNanos.set(4_000_000_000L); + now.set(4000); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. completion.get().complete(success); assertEquals(success, delayed.get()); @@ -156,17 +153,14 @@ class HttpRequestStrategyTest { // Some error responses are retried. HttpResponse serverError = HttpResponse.of(503, null); - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(200_000_000L); // Exceed grace period. - vessel.complete(serverError); - }); + cluster.expect((__, vessel) -> vessel.complete(serverError)); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -175,22 +169,10 @@ class HttpRequestStrategyTest { assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); - - // IOException is not retried past timeout. - cluster.expect((__, vessel) -> { - nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts. - vessel.completeExceptionally(new IOException("retry me")); - }); - expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get()); - assertEquals("retry me", expected.getCause().getCause().getMessage()); - assertEquals(15, strategy.stats().requests()); - - // Circuit breaker opens some time after starting to fail. - nowNanos.set(6_000_000_000L); + now.set(6000); assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. - nowNanos.set(605_000_000_000L); + now.set(605000); assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. strategy.destroy(); @@ -201,7 +183,7 @@ class HttpRequestStrategyTest { codes.put(429, 2L); codes.put(503, 3L); assertEquals(codes, stats.responsesByCode()); - assertEquals(5, stats.exceptions()); + assertEquals(3, stats.exceptions()); assertEquals(stats, stats.since(initial)); assertEquals(0, stats.since(stats).averageLatencyMillis()); @@ -212,8 +194,8 @@ class HttpRequestStrategyTest { @Test void testShutdown() { MockCluster cluster = new MockCluster(); - AtomicLong nowNanos = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } @@ -227,10 +209,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest failing = new HttpRequest("POST", "/", null, null, null); + HttpRequest partial = new HttpRequest("POST", "/", null, null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null, null); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); @@ -261,7 +243,7 @@ class HttpRequestStrategyTest { CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request); phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async. // failed immediately fails, and lets us assert the above retry is indeed enqueued. - assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal", + assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal", assertThrows(ExecutionException.class, failed::get).getMessage()); phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. @@ -288,7 +270,7 @@ class HttpRequestStrategyTest { assertThrows(ExecutionException.class, blocked::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, delayed::get).getMessage()); - assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed", + assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", assertThrows(ExecutionException.class, retried::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); |