summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java67
1 files changed, 6 insertions, 61 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index f313f08426c..b9ab4e481ac 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -17,13 +17,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() throws IOException {
+ void testConcurrency() {
int documents = 1 << 16;
HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
@@ -52,7 +50,7 @@ class HttpRequestStrategyTest {
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setConnectionsPerEndpoint(1 << 10)
.setMaxStreamPerConnection(1 << 12),
- () -> cluster);
+ cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -84,7 +82,7 @@ class HttpRequestStrategyTest {
}
@Test()
- void testRetries() throws ExecutionException, InterruptedException, IOException {
+ void testRetries() throws ExecutionException, InterruptedException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
@@ -97,7 +95,7 @@ class HttpRequestStrategyTest {
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
.setMaxStreamPerConnection(minStreams),
- () -> cluster);
+ cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
@@ -212,54 +210,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
- List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
- HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
- clusters.iterator()::next);
-
- // First operation fails, second remains in flight, and third fails.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
- Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
- clusters.get(0).expect((__, vessel) -> {
- try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
- });
- CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
- CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
-
- // Time advances, and the circuit breaker half-opens.
- assertEquals(CLOSED, breaker.state());
- now.addAndGet(2000);
- assertEquals(HALF_OPEN, breaker.state());
-
- // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- assertFalse(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- secondVessel.complete(HttpResponse.of(504, null));
- assertEquals(504, secondResponse.get().code());
- assertTrue(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- strategy.await();
- strategy.destroy();
- assertTrue(clusters.get(1).closed.get());
- }
-
- @Test
- void testShutdown() throws IOException {
+ void testShutdown() {
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
@@ -269,7 +220,7 @@ class HttpRequestStrategyTest {
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
- () -> cluster);
+ cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
@@ -346,7 +297,6 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
- final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -357,11 +307,6 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
- @Override
- public void close() {
- closed.set(true);
- }
-
}
}