diff options
Diffstat (limited to 'vespa-feed-client/src/test')
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java | 13 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java | 67 |
2 files changed, 12 insertions, 68 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index cec070c06a6..14ade35825f 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -11,7 +11,6 @@ import ai.vespa.feed.client.Result; import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.List; @@ -33,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpFeedClientTest { @Test - void testFeeding() throws ExecutionException, InterruptedException, IOException { + void testFeeding() throws ExecutionException, InterruptedException { DocumentId id = DocumentId.of("ns", "type", "0"); AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @@ -44,7 +43,7 @@ class HttpFeedClientTest { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), - () -> new DryrunCluster(), + new DryrunCluster(), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. @@ -212,7 +211,7 @@ class HttpFeedClientTest { } @Test - void testHandshake() throws IOException { + void testHandshake() { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); @@ -239,19 +238,19 @@ class HttpFeedClientTest { assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), - () -> cluster, + cluster, null)) .getMessage()); // Old server. new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - () -> cluster, + cluster, null); // New server. response.set(okResponse); new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - () -> cluster, + cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index f313f08426c..b9ab4e481ac 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -17,13 +17,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @Test - void testConcurrency() throws IOException { + void testConcurrency() { int documents = 1 << 16; HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); @@ -52,7 +50,7 @@ class HttpRequestStrategyTest { HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), - () -> cluster); + cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { @@ -84,7 +82,7 @@ class HttpRequestStrategyTest { } @Test() - void testRetries() throws ExecutionException, InterruptedException, IOException { + void testRetries() throws ExecutionException, InterruptedException { int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); @@ -97,7 +95,7 @@ class HttpRequestStrategyTest { .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) .setMaxStreamPerConnection(minStreams), - () -> cluster); + cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); @@ -212,54 +210,7 @@ class HttpRequestStrategyTest { } @Test - void testResettingCluster() throws ExecutionException, InterruptedException, IOException { - List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); - AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), - clusters.iterator()::next); - - // First operation fails, second remains in flight, and third fails. - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); - Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); - clusters.get(0).expect((__, vessel) -> { - try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } - }); - CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); - CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); - - // Time advances, and the circuit breaker half-opens. - assertEquals(CLOSED, breaker.state()); - now.addAndGet(2000); - assertEquals(HALF_OPEN, breaker.state()); - - // It's indeterminate which cluster gets the next request, but the second should get the next one after that. - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); - - clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); - clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); - - assertFalse(clusters.get(0).closed.get()); - assertFalse(clusters.get(1).closed.get()); - secondVessel.complete(HttpResponse.of(504, null)); - assertEquals(504, secondResponse.get().code()); - assertTrue(clusters.get(0).closed.get()); - assertFalse(clusters.get(1).closed.get()); - strategy.await(); - strategy.destroy(); - assertTrue(clusters.get(1).closed.get()); - } - - @Test - void testShutdown() throws IOException { + void testShutdown() { MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); @@ -269,7 +220,7 @@ class HttpRequestStrategyTest { }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. - () -> cluster); + cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); @@ -346,7 +297,6 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); - final AtomicBoolean closed = new AtomicBoolean(false); void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); @@ -357,11 +307,6 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } - @Override - public void close() { - closed.set(true); - } - } } |