diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-25 13:51:31 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-25 13:51:31 +0200 |
commit | ce27d7104be34f11e9d6966036b3bdfd7928dcc8 (patch) | |
tree | 9c3ec70aa249ab7d9e84208dec0bffbb2d324cec /vespa-feed-client | |
parent | e39a7eb17d4e7f1fad56fa9ff5873ba4eadbc523 (diff) |
Propagate last retried result when aborting execution (and fix shutdown)
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 86 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java | 89 |
2 files changed, 137 insertions, 38 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 71de6ff2065..dc01f01b2e8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -26,17 +27,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; -import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -// TODO: update doc /** - * Controls request execution and retries: - * <ul> - * <li>Whenever throttled (429, 503), set target inflight to 0.9 * current, and retry over a different connection;</li> - * <li>retry other transient errors (500, 502 and IOException) a specified number of times, for specified operation types;</li> - * <li>and on every successful response, increase target inflight by 0.1.</li> - * </ul> + * Controls request execution and retries. + * + * This class has all control flow for throttling and dispatching HTTP requests to an injected + * HTTP cluster, including error handling and retries, and a circuit breaker mechanism. * * @author jonmv */ @@ -45,10 +42,10 @@ class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Cluster cluster; - private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>(); + private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; - final FeedClient.Throttler throttler; + private final FeedClient.Throttler throttler; private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); private final AtomicLong inflight = new AtomicLong(0); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -89,21 +86,49 @@ class HttpRequestStrategy implements RequestStrategy { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.log(WARNING, "Dispatch thread interrupted; shutting down"); + catch (Throwable t) { + log.log(WARNING, "Dispatch thread threw; shutting down", t); } destroy(); } private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) { delayedCount.incrementAndGet(); - queue.offer(() -> { - cluster.dispatch(request, vessel); - }); + queue.offer(() -> cluster.dispatch(request, vessel)); + } + + + /** A completable future which stores a temporary failure result to return upon abortion. */ + private static class RetriableFuture<T> extends CompletableFuture<T> { + + private final AtomicReference<Runnable> completion = new AtomicReference<>(); + private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>(); + + private RetriableFuture() { + completion.set(() -> completeExceptionally(new FeedException("Operation aborted"))); + } + + /** Complete now with the last result or error. */ + void complete() { + completion.get().run(); + if (dependency.get() != null) dependency.getAndSet(null).complete(); + } + + /** Ensures the dependency is completed whenever this is. */ + void dependOn(RetriableFuture<T> dependency) { + this.dependency.set(dependency); + if (isDone()) dependency.complete(); + } + + /** Set the result of the last attempt at completing the computation represented by this. */ + void set(T result, Throwable thrown) { + completion.set(thrown != null ? () -> completeExceptionally(thrown) + : () -> complete(result)); + } + } private boolean poll() { @@ -208,11 +233,11 @@ class HttpRequestStrategy implements RequestStrategy { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { - CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries. CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. - CompletableFuture<?> previous = inflightById.put(documentId, result); + RetriableFuture<HttpResponse> previous = inflightById.put(documentId, result); if (destroyed.get()) { - result.cancel(true); + result.complete(); return result; } @@ -221,13 +246,15 @@ class HttpRequestStrategy implements RequestStrategy { offer(request, vessel); throttler.sent(inflight.get(), result); } - else + else { + result.dependOn(previous); // In case result is aborted, also abort the previous if still inflight. previous.whenComplete((__, ___) -> offer(request, vessel)); + } handleAttempt(vessel, request, result, 1); return result.handle((response, error) -> { - if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) + if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) releaseSlot(); if (error != null) { @@ -239,29 +266,26 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { + result.set(response, thrown); // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { - CircuitBreaker.State state = breaker.state(); CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); offer(request, retry); - handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1)); } // ... or accept the outcome and mark the operation as complete. - else { - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - } + else result.complete(); }, resultExecutor); } @Override public void destroy() { - if ( ! destroyed.getAndSet(true)) { - inflightById.values().forEach(result -> result.cancel(true)); // TODO: More informative exception. + if (destroyed.compareAndSet(false, true)) { + inflightById.values().forEach(RetriableFuture::complete); cluster.close(); resultExecutor.shutdown(); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 6b2087b1828..02175150fed 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +28,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -47,11 +49,8 @@ class HttpRequestStrategyTest { CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { - while ( ! latch.await(1, TimeUnit.SECONDS)) { + while ( ! latch.await(1, TimeUnit.SECONDS)) System.err.println(cluster.stats().inflight()); - System.err.println(strategy.throttler.targetInflight()); - System.err.println(); - } } catch (InterruptedException ignored) { } }).start(); @@ -78,7 +77,7 @@ class HttpRequestStrategyTest { } @Test - void testLogic() throws ExecutionException, InterruptedException { + void testRetries() throws ExecutionException, InterruptedException { int minStreams = 16; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); @@ -98,10 +97,11 @@ class HttpRequestStrategyTest { HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. - cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom"))); + cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertEquals("boom", expected.getCause().getMessage()); + assertTrue(expected.getCause() instanceof FeedException); + assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. @@ -178,6 +178,81 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().exceptions()); } + @Test + void testShutdown() { + MockCluster cluster = new MockCluster(); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + new BenchmarkingCluster(cluster)); + + DocumentId id1 = DocumentId.of("ns", "type", "1"); + DocumentId id2 = DocumentId.of("ns", "type", "2"); + DocumentId id3 = DocumentId.of("ns", "type", "3"); + DocumentId id4 = DocumentId.of("ns", "type", "4"); + HttpRequest failing = new HttpRequest("POST", "/", null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null); + + // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. + Phaser phaser = new Phaser(2); + Phaser blocker = new Phaser(2); + AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>(); + cluster.expect((req, vessel) -> { + if (req == blocking) { + phaser.arriveAndAwaitAdvance(); // Synchronise with tst main thread, and then ... + blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue. + throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything. + } + else if (req == failing) { + phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this. + vessel.completeExceptionally(new IOException("failed")); + } + else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this. + }); + CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request); + CompletableFuture<HttpResponse> failed = strategy.enqueue(id2, failing); + CompletableFuture<HttpResponse> blocked = strategy.enqueue(id3, blocking); + CompletableFuture<HttpResponse> delayed = strategy.enqueue(id4, request); + phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response. + phaser.arriveAndAwaitAdvance(); // failed completes dispatch, and a retry is enqueued. + phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. + + // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it; + // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch; + // failed has a partial result, and has a retry in the dispatch queue. + assertFalse(inflight.isDone()); + assertFalse(serialised1.isDone()); + assertFalse(serialised2.isDone()); + assertFalse(failed.isDone()); + assertFalse(blocked.isDone()); + assertFalse(delayed.isDone()); + + // Kill dispatch thread, and see that all enqueued operations, and new ones, complete. + blocker.arriveAndAwaitAdvance(); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, inflight::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised1::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised2::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, blocked::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, delayed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", + assertThrows(ExecutionException.class, failed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); + } + static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); |