diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 16:43:52 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 16:43:52 +0200 |
commit | 8967f105924c1565d7519de35e976f18740e679a (patch) | |
tree | 54cba71e83be218e327664150e6745b05c95fbde /vespa-feed-client/src | |
parent | 481fc1c196282c105d96e2e38b620920756059c9 (diff) |
Add teest for HttpRequestStrategy and HttpFeedClient, and fix minor bugs
Diffstat (limited to 'vespa-feed-client/src')
5 files changed, 213 insertions, 6 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java index 3e70bd94648..1ae8ae1d490 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -31,7 +31,7 @@ public class BenchmarkingCluster implements Cluster { private final long[] responsesByCode = new long[600]; private long exceptions = 0; private long totalLatencyMillis = 0; - private long minLatencyMillis = 0; + private long minLatencyMillis = Long.MAX_VALUE; private long maxLatencyMillis = 0; private long bytesSent = 0; private long bytesReceived = 0; @@ -88,8 +88,8 @@ public class BenchmarkingCluster implements Cluster { return new OperationStats(requests.get(), responses, exceptions, - requests.get() - results, - totalLatencyMillis / this.responses, + requests.get() - results, + this.responses == 0 ? 0 : totalLatencyMillis / this.responses, minLatencyMillis, maxLatencyMillis, bytesSent, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index da7d04830ad..bcf1c4ae107 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -13,7 +13,7 @@ import java.util.concurrent.CompletableFuture; */ interface Cluster extends Closeable { - /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. */ + /** Dispatch the request to the cluster, causing the response vessel to complete at a later time. May not throw. */ void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel); @Override diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 0d985376e91..408488cbaec 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -113,7 +113,7 @@ class HttpRequestStrategy implements RequestStrategy { } private boolean retry(SimpleHttpRequest request, int attempt) { - if (attempt >= strategy.retries()) + if (attempt > strategy.retries()) return false; switch (request.getMethod().toUpperCase()) { diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java new file mode 100644 index 00000000000..65fbcb12204 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -0,0 +1,71 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.BiConsumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class HttpFeedClientTest { + + @Test + void testRequestGeneration() throws IOException, ExecutionException, InterruptedException { + DocumentId id = DocumentId.of("ns", "type", "0"); + class MockRequestStrategy implements RequestStrategy { + @Override public OperationStats stats() { throw new UnsupportedOperationException(); } + @Override public boolean hasFailed() { return false; } + @Override public void destroy() { throw new UnsupportedOperationException(); } + @Override public void await() { throw new UnsupportedOperationException(); } + @Override public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request) { + try { + assertEquals(id, documentId); + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", + request.getUri().toString()); + assertEquals("json", request.getBodyText()); + + SimpleHttpResponse response = new SimpleHttpResponse(502); + response.setBody("{\n" + + " \"pathId\": \"/document/v1/ns/type/docid/0\",\n" + + " \"id\": \"id:ns:type::0\",\n" + + " \"message\": \"Ooops! ... I did it again.\",\n" + + " \"trace\": \"I played with your heart. Got lost in the game.\"\n" + + "}", + ContentType.APPLICATION_JSON); + return CompletableFuture.completedFuture(response); + } + catch (Throwable thrown) { + CompletableFuture<SimpleHttpResponse> failed = new CompletableFuture<>(); + failed.completeExceptionally(thrown); + return failed; + } + } + + } + Result result = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), + new MockRequestStrategy()) + .put(id, + "json", + OperationParameters.empty() + .createIfNonExistent(true) + .testAndSetCondition("false") + .route("route") + .timeout(Duration.ofSeconds(5))) + .get(); + assertEquals("Ooops! ... I did it again.", result.resultMessage().get()); + assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 4cc15465bd5..7411f4124e5 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -1,18 +1,34 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.CircuitBreaker; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.ContentType; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.net.URI; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; class HttpRequestStrategyTest { @@ -51,7 +67,127 @@ class HttpRequestStrategyTest { } @Test - void test() { + void testLogic() throws ExecutionException, InterruptedException { + int minStreams = 16; // Hard limit for minimum number of streams per connection. + MockCluster cluster = new MockCluster(); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams), + new BenchmarkingCluster(cluster)); + + DocumentId id1 = DocumentId.of("ns", "type", "1"); + DocumentId id2 = DocumentId.of("ns", "type", "2"); + SimpleHttpRequest request = new SimpleHttpRequest("POST", "/"); + + // Runtime exception is not retried. + cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, request).get()); + assertEquals("boom", expected.getCause().getMessage()); + assertEquals(1, strategy.stats().requests()); + + // IOException is retried. + cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); + expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, request).get()); + assertEquals("retry me", expected.getCause().getMessage()); + assertEquals(3, strategy.stats().requests()); + + // Successful response is returned + SimpleHttpResponse success = new SimpleHttpResponse(200); + cluster.expect((__, vessel) -> vessel.complete(success)); + assertEquals(success, strategy.enqueue(id1, request).get()); + assertEquals(4, strategy.stats().requests()); + + // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. + now.set(2000); + SimpleHttpResponse throttled = new SimpleHttpResponse(429); + AtomicInteger count = new AtomicInteger(3); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<CompletableFuture<SimpleHttpResponse>> completion = new AtomicReference<>(); + cluster.expect((req, vessel) -> { + if (req == request) { + if (count.decrementAndGet() > 0) + vessel.complete(throttled); + else { + completion.set(vessel); + latch.countDown(); + } + } + else vessel.complete(success); + }); + CompletableFuture<SimpleHttpResponse> delayed = strategy.enqueue(id1, request); + CompletableFuture<SimpleHttpResponse> serialised = strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")); + assertEquals(success, strategy.enqueue(id2, new SimpleHttpRequest("DELETE", "/")).get()); + latch.await(); + assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. + now.set(4000); + assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. + completion.get().complete(success); + assertEquals(success, delayed.get()); + assertEquals(success, serialised.get()); + + // Some error responses are retried. + SimpleHttpResponse serverError = new SimpleHttpResponse(500); + cluster.expect((__, vessel) -> vessel.complete(serverError)); + assertEquals(serverError, strategy.enqueue(id1, request).get()); + assertEquals(11, strategy.stats().requests()); + assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. + + // Error responses are not retried when not of appropriate type. + cluster.expect((__, vessel) -> vessel.complete(serverError)); + assertEquals(serverError, strategy.enqueue(id1, new SimpleHttpRequest("PUT", "/")).get()); + assertEquals(12, strategy.stats().requests()); + + // Some error responses are not retried. + SimpleHttpResponse badRequest = new SimpleHttpResponse(400); + cluster.expect((__, vessel) -> vessel.complete(badRequest)); + assertEquals(badRequest, strategy.enqueue(id1, request).get()); + assertEquals(13, strategy.stats().requests()); + + // Circuit breaker opens some time after starting to fail. + now.set(6000); + assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. + now.set(605000); + assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. + + Map<Integer, Long> codes = new HashMap<>(); + codes.put(200, 4L); + codes.put(400, 1L); + codes.put(429, 2L); + codes.put(500, 3L); + assertEquals(codes, strategy.stats().responsesByCode()); + assertEquals(3, strategy.stats().exceptions()); + } + + static class MockCluster implements Cluster { + + final AtomicReference<BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>>> dispatch = new AtomicReference<>(); + + void expect(BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> expected) { + dispatch.set(expected); + } + + @Override + public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + dispatch.get().accept(request, vessel); + } + + @Override + public void close() { } + + @Override + public OperationStats stats() { + return null; + } + } } |