summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 16:43:52 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-09 16:43:52 +0200
commit8967f105924c1565d7519de35e976f18740e679a (patch)
tree54cba71e83be218e327664150e6745b05c95fbde /vespa-feed-client
parent481fc1c196282c105d96e2e38b620920756059c9 (diff)
Add teest for HttpRequestStrategy and HttpFeedClient, and fix minor bugs
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java71
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java138
5 files changed, 213 insertions, 6 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
index 3e70bd94648..1ae8ae1d490 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
@@ -31,7 +31,7 @@ public class BenchmarkingCluster implements Cluster {
private final long[] responsesByCode = new long[600];
private long exceptions = 0;
private long totalLatencyMillis = 0;
- private long minLatencyMillis = 0;
+ private long minLatencyMillis = Long.MAX_VALUE;
private long maxLatencyMillis = 0;
private long bytesSent = 0;
private long bytesReceived = 0;
@@ -88,8 +88,8 @@ public class BenchmarkingCluster implements Cluster {
return new OperationStats(requests.get(),
responses,
exceptions,
- requests.get() - results,
- totalLatencyMillis / this.responses,
+ requests.get() - results,
+ this.responses == 0 ? 0 : totalLatencyMillis / this.responses,
minLatencyMillis,
maxLatencyMillis,
bytesSent,
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
index da7d04830ad..bcf1c4ae107 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -13,7 +13,7 @@ import java.util.concurrent.CompletableFuture;
*/
interface Cluster extends Closeable {
- /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. */
+ /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */
void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel);
@Override
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 0d985376e91..408488cbaec 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -113,7 +113,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
private boolean retry(SimpleHttpRequest request, int attempt) {
- if (attempt >= strategy.retries())
+ if (attempt > strategy.retries())
return false;
switch (request.getMethod().toUpperCase()) {
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
new file mode 100644
index 00000000000..65fbcb12204
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -0,0 +1,71 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.http.ContentType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.BiConsumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class HttpFeedClientTest {
+
+ @Test
+ void testRequestGeneration() throws IOException, ExecutionException, InterruptedException {
+ DocumentId id = DocumentId.of("ns", "type", "0");
+ class MockRequestStrategy implements RequestStrategy {
+ @Override public OperationStats stats() { throw new UnsupportedOperationException(); }
+ @Override public boolean hasFailed() { return false; }
+ @Override public void destroy() { throw new UnsupportedOperationException(); }
+ @Override public void await() { throw new UnsupportedOperationException(); }
+ @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) {
+ try {
+ assertEquals(id, documentId);
+ assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
+ request.getUri().toString());
+ assertEquals("json", request.getBodyText());
+
+ SimpleHttpResponse response = new SimpleHttpResponse(502);
+ response.setBody("{\n" +
+ " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" +
+ " \"id\": \"id:ns:type::0\",\n" +
+ " \"message\": \"Ooops! ... I did it again.\",\n" +
+ " \"trace\": \"I played with your heart. Got lost in the game.\"\n" +
+ "}",
+ ContentType.APPLICATION_JSON);
+ return CompletableFuture.completedFuture(response);
+ }
+ catch (Throwable thrown) {
+ CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>();
+ failed.completeExceptionally(thrown);
+ return failed;
+ }
+ }
+
+ }
+ Result result = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")),
+ new MockRequestStrategy())
+ .put(id,
+ "json",
+ OperationParameters.empty()
+ .createIfNonExistent(true)
+ .testAndSetCondition("false")
+ .route("route")
+ .timeout(Duration.ofSeconds(5)))
+ .get();
+ assertEquals("Ooops! ... I did it again.", result.resultMessage().get());
+ assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get());
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 4cc15465bd5..7411f4124e5 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -1,18 +1,34 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.ContentType;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
class HttpRequestStrategyTest {
@@ -51,7 +67,127 @@ class HttpRequestStrategyTest {
}
@Test
- void test() {
+ void testLogic() throws ExecutionException, InterruptedException {
+ int minStreams = 16; // Hard limit for minimum number of streams per connection.
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
+ new BenchmarkingCluster(cluster));
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ SimpleHttpRequest request = new SimpleHttpRequest("POST", "/");
+
+ // Runtime exception is not retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("boom", expected.getCause().getMessage());
+ assertEquals(1, strategy.stats().requests());
+
+ // IOException is retried.
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
+ expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, request).get());
+ assertEquals("retry me", expected.getCause().getMessage());
+ assertEquals(3, strategy.stats().requests());
+
+ // Successful response is returned
+ SimpleHttpResponse success = new SimpleHttpResponse(200);
+ cluster.expect((__, vessel) -> vessel.complete(success));
+ assertEquals(success, strategy.enqueue(id1, request).get());
+ assertEquals(4, strategy.stats().requests());
+
+ // Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
+ now.set(2000);
+ SimpleHttpResponse throttled = new SimpleHttpResponse(429);
+ AtomicInteger count = new AtomicInteger(3);
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>();
+ cluster.expect((req, vessel) -> {
+ if (req == request) {
+ if (count.decrementAndGet() > 0)
+ vessel.complete(throttled);
+ else {
+ completion.set(vessel);
+ latch.countDown();
+ }
+ }
+ else vessel.complete(success);
+ });
+ CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request);
+ CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/"));
+ assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get());
+ latch.await();
+ assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
+ now.set(4000);
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+ completion.get().complete(success);
+ assertEquals(success, delayed.get());
+ assertEquals(success, serialised.get());
+
+ // Some error responses are retried.
+ SimpleHttpResponse serverError = new SimpleHttpResponse(500);
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, request).get());
+ assertEquals(11, strategy.stats().requests());
+ assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
+
+ // Error responses are not retried when not of appropriate type.
+ cluster.expect((__, vessel) -> vessel.complete(serverError));
+ assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get());
+ assertEquals(12, strategy.stats().requests());
+
+ // Some error responses are not retried.
+ SimpleHttpResponse badRequest = new SimpleHttpResponse(400);
+ cluster.expect((__, vessel) -> vessel.complete(badRequest));
+ assertEquals(badRequest, strategy.enqueue(id1, request).get());
+ assertEquals(13, strategy.stats().requests());
+
+ // Circuit breaker opens some time after starting to fail.
+ now.set(6000);
+ assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
+ now.set(605000);
+ assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
+
+ Map<Integer, Long> codes = new HashMap<>();
+ codes.put(200, 4L);
+ codes.put(400, 1L);
+ codes.put(429, 2L);
+ codes.put(500, 3L);
+ assertEquals(codes, strategy.stats().responsesByCode());
+ assertEquals(3, strategy.stats().exceptions());
+ }
+
+ static class MockCluster implements Cluster {
+
+ final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>();
+
+ void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) {
+ dispatch.set(expected);
+ }
+
+ @Override
+ public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ dispatch.get().accept(request, vessel);
+ }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public OperationStats stats() {
+ return null;
+ }
+
}
}