summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/test')
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java13
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java67
2 files changed, 12 insertions, 68 deletions
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index cec070c06a6..14ade35825f 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -11,7 +11,6 @@ import ai.vespa.feed.client.Result;
import ai.vespa.feed.client.ResultException;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
@@ -33,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpFeedClientTest {
@Test
- void testFeeding() throws ExecutionException, InterruptedException, IOException {
+ void testFeeding() throws ExecutionException, InterruptedException {
DocumentId id = DocumentId.of("ns", "type", "0");
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@@ -44,7 +43,7 @@ class HttpFeedClientTest {
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
- () -> new DryrunCluster(),
+ new DryrunCluster(),
new MockRequestStrategy());
// Update is a PUT, and 200 OK is a success.
@@ -212,7 +211,7 @@ class HttpFeedClientTest {
}
@Test
- void testHandshake() throws IOException {
+ void testHandshake() {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
@@ -239,19 +238,19 @@ class HttpFeedClientTest {
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
- () -> cluster,
+ cluster,
null))
.getMessage());
// Old server.
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- () -> cluster,
+ cluster,
null);
// New server.
response.set(okResponse);
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- () -> cluster,
+ cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index f313f08426c..b9ab4e481ac 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -17,13 +17,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() throws IOException {
+ void testConcurrency() {
int documents = 1 << 16;
HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
@@ -52,7 +50,7 @@ class HttpRequestStrategyTest {
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setConnectionsPerEndpoint(1 << 10)
.setMaxStreamPerConnection(1 << 12),
- () -> cluster);
+ cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -84,7 +82,7 @@ class HttpRequestStrategyTest {
}
@Test()
- void testRetries() throws ExecutionException, InterruptedException, IOException {
+ void testRetries() throws ExecutionException, InterruptedException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
@@ -97,7 +95,7 @@ class HttpRequestStrategyTest {
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
.setMaxStreamPerConnection(minStreams),
- () -> cluster);
+ cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
@@ -212,54 +210,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
- List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
- HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
- clusters.iterator()::next);
-
- // First operation fails, second remains in flight, and third fails.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
- Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
- clusters.get(0).expect((__, vessel) -> {
- try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
- });
- CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
- CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
-
- // Time advances, and the circuit breaker half-opens.
- assertEquals(CLOSED, breaker.state());
- now.addAndGet(2000);
- assertEquals(HALF_OPEN, breaker.state());
-
- // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- assertFalse(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- secondVessel.complete(HttpResponse.of(504, null));
- assertEquals(504, secondResponse.get().code());
- assertTrue(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- strategy.await();
- strategy.destroy();
- assertTrue(clusters.get(1).closed.get());
- }
-
- @Test
- void testShutdown() throws IOException {
+ void testShutdown() {
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
@@ -269,7 +220,7 @@ class HttpRequestStrategyTest {
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
- () -> cluster);
+ cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
@@ -346,7 +297,6 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
- final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -357,11 +307,6 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
- @Override
- public void close() {
- closed.set(true);
- }
-
}
}