From 72036953cbebf1c90d881ed0f555a93d32b7189a Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 24 Jun 2021 20:44:27 +0200 Subject: All exceptional failures are now exceptions --- .../ai/vespa/feed/client/HttpFeedClientTest.java | 34 ++++++++++++---------- .../client/examples/JsonFileFeederExample.java | 8 ++--- .../client/examples/JsonStreamFeederExample.java | 2 -- 3 files changed, 22 insertions(+), 22 deletions(-) (limited to 'vespa-feed-client/src/test/java') diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index d8090549420..6aa0de2160c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -13,6 +13,7 @@ import java.util.function.BiFunction; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * @author jonmv @@ -55,16 +56,19 @@ class HttpFeedClientTest { return failed; } }); - Result result = client.put(id, - "json", - OperationParameters.empty() - .createIfNonExistent(true) - .testAndSetCondition("false") - .route("route") - .timeout(Duration.ofSeconds(5))) - .get(); - assertEquals("Ooops! ... I did it again.", result.resultMessage().get()); - assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get()); + ExecutionException expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty() + .createIfNonExistent(true) + .testAndSetCondition("false") + .route("route") + .timeout(Duration.ofSeconds(5))) + .get()); + assertTrue(expected.getCause() instanceof ResultException); + ResultException result = (ResultException) expected.getCause(); + assertEquals("Ooops! ... I did it again.", result.getMessage()); + assertEquals("I played with your heart. Got lost in the game.", result.getTrace().get()); // Handler error is a FeedException. @@ -90,11 +94,11 @@ class HttpFeedClientTest { return failed; } }); - ExecutionException expected = assertThrows(ExecutionException.class, - () -> client.put(id, - "json", - OperationParameters.empty()) - .get()); + expected = assertThrows(ExecutionException.class, + () -> client.put(id, + "json", + OperationParameters.empty()) + .get()); assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java index 1e616f2625a..3b633c38132 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -38,17 +38,15 @@ class JsonFileFeederExample implements Closeable { resultsReceived.incrementAndGet(); if (error != null) { log.warning("Problems with feeding document " - + error.documentId().map(DocumentId::toString).orElse("")); - errorsReceived.incrementAndGet(); - } else if (result.type() == Result.Type.failure) { - log.warning("Problems with docID " + result.documentId() + ":" + error); + + error.documentId().map(DocumentId::toString).orElse("") + + ": " + error); errorsReceived.incrementAndGet(); } } @Override public void onError(FeedException error) { - log.severe("Feeding failed for d: " + error.getMessage()); + log.severe("Feeding failed fatally: " + error.getMessage()); } @Override diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java index 5cee776b244..cbe0e213907 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -100,8 +100,6 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable { if (throwable != null) { System.err.printf("Failure for '%s': %s", docId, throwable); throwable.printStackTrace(); - } else if (result.type() == Result.Type.failure) { - System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("")); } }); } catch (InterruptedException e) { -- cgit v1.2.3 From 373b66234f10a0bb4a34fe6032efcc9c49b0dd0a Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 25 Jun 2021 09:24:38 +0200 Subject: Lower log level in HttpRequestStrategy --- .../java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java | 6 +++--- .../main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 10 +++++----- .../java/ai/vespa/feed/client/HttpRequestStrategyTest.java | 1 + 3 files changed, 9 insertions(+), 8 deletions(-) (limited to 'vespa-feed-client/src/test/java') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java index 2c5c2dccf19..c319bfca252 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java @@ -8,7 +8,7 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.INFO; +import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; /** @@ -48,7 +48,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { public void success() { failingSinceMillis.set(NEVER); if ( ! open.get() && halfOpen.compareAndSet(true, false)) - log.log(INFO, "Circuit breaker is now closed"); + log.log(FINE, "Circuit breaker is now closed"); } @Override @@ -60,7 +60,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { public State state() { long failingMillis = clock.getAsLong() - failingSinceMillis.get(); if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) - log.log(INFO, "Circuit breaker is now half-open"); + log.log(FINE, "Circuit breaker is now half-open"); if (failingMillis > doomMillis && open.compareAndSet(false, true)) log.log(WARNING, "Circuit breaker is now open"); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index ddfc509738f..71de6ff2065 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -24,6 +24,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; +import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; @@ -139,11 +140,11 @@ class HttpRequestStrategy implements RequestStrategy { if ( (thrown instanceof IOException) // General IO problems. || (thrown instanceof CancellationException) // TLS session disconnect. || (thrown instanceof CancelledKeyException)) { // Selection cancelled. - log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request); + log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); return retry(request, attempt); } - log.log(WARNING, thrown, () -> "Failed attempt " + attempt + " at " + request); + log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); return false; } @@ -158,18 +159,17 @@ class HttpRequestStrategy implements RequestStrategy { if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. - logResponse(FINE, response, request, attempt); + logResponse(FINER, response, request, attempt); throttler.throttled((inflight.get() - delayedCount.get())); return true; } breaker.failure(); + logResponse(FINE, response, request, attempt); if (response.code() == 500 || response.code() == 502 || response.code() == 504) { // Hopefully temporary errors. - logResponse(INFO, response, request, attempt); return retry(request, attempt); } - logResponse(WARNING, response, request, attempt); return false; } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 21ab6889e6e..6b2087b1828 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -28,6 +28,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { -- cgit v1.2.3 From ce27d7104be34f11e9d6966036b3bdfd7928dcc8 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 25 Jun 2021 13:51:31 +0200 Subject: Propagate last retried result when aborting execution (and fix shutdown) --- .../ai/vespa/feed/client/HttpRequestStrategy.java | 86 +++++++++++++-------- .../vespa/feed/client/HttpRequestStrategyTest.java | 89 ++++++++++++++++++++-- 2 files changed, 137 insertions(+), 38 deletions(-) (limited to 'vespa-feed-client/src/test/java') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 71de6ff2065..dc01f01b2e8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -26,17 +27,13 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; -import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; -// TODO: update doc /** - * Controls request execution and retries: - *
    - *
  • Whenever throttled (429, 503), set target inflight to 0.9 * current, and retry over a different connection;
  • - *
  • retry other transient errors (500, 502 and IOException) a specified number of times, for specified operation types;
  • - *
  • and on every successful response, increase target inflight by 0.1.
  • - *
+ * Controls request execution and retries. + * + * This class has all control flow for throttling and dispatching HTTP requests to an injected + * HTTP cluster, including error handling and retries, and a circuit breaker mechanism. * * @author jonmv */ @@ -45,10 +42,10 @@ class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); private final Cluster cluster; - private final Map> inflightById = new ConcurrentHashMap<>(); + private final Map> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; - final FeedClient.Throttler throttler; + private final FeedClient.Throttler throttler; private final Queue queue = new ConcurrentLinkedQueue<>(); private final AtomicLong inflight = new AtomicLong(0); private final AtomicBoolean destroyed = new AtomicBoolean(false); @@ -89,21 +86,49 @@ class HttpRequestStrategy implements RequestStrategy { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.log(WARNING, "Dispatch thread interrupted; shutting down"); + catch (Throwable t) { + log.log(WARNING, "Dispatch thread threw; shutting down", t); } destroy(); } private void offer(HttpRequest request, CompletableFuture vessel) { delayedCount.incrementAndGet(); - queue.offer(() -> { - cluster.dispatch(request, vessel); - }); + queue.offer(() -> cluster.dispatch(request, vessel)); + } + + + /** A completable future which stores a temporary failure result to return upon abortion. */ + private static class RetriableFuture extends CompletableFuture { + + private final AtomicReference completion = new AtomicReference<>(); + private final AtomicReference> dependency = new AtomicReference<>(); + + private RetriableFuture() { + completion.set(() -> completeExceptionally(new FeedException("Operation aborted"))); + } + + /** Complete now with the last result or error. */ + void complete() { + completion.get().run(); + if (dependency.get() != null) dependency.getAndSet(null).complete(); + } + + /** Ensures the dependency is completed whenever this is. */ + void dependOn(RetriableFuture dependency) { + this.dependency.set(dependency); + if (isDone()) dependency.complete(); + } + + /** Set the result of the last attempt at completing the computation represented by this. */ + void set(T result, Throwable thrown) { + completion.set(thrown != null ? () -> completeExceptionally(thrown) + : () -> complete(result)); + } + } private boolean poll() { @@ -208,11 +233,11 @@ class HttpRequestStrategy implements RequestStrategy { @Override public CompletableFuture enqueue(DocumentId documentId, HttpRequest request) { - CompletableFuture result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. + RetriableFuture result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries. CompletableFuture vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. - CompletableFuture previous = inflightById.put(documentId, result); + RetriableFuture previous = inflightById.put(documentId, result); if (destroyed.get()) { - result.cancel(true); + result.complete(); return result; } @@ -221,13 +246,15 @@ class HttpRequestStrategy implements RequestStrategy { offer(request, vessel); throttler.sent(inflight.get(), result); } - else + else { + result.dependOn(previous); // In case result is aborted, also abort the previous if still inflight. previous.whenComplete((__, ___) -> offer(request, vessel)); + } handleAttempt(vessel, request, result, 1); return result.handle((response, error) -> { - if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) + if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null) releaseSlot(); if (error != null) { @@ -239,29 +266,26 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture vessel, HttpRequest request, CompletableFuture result, int attempt) { + private void handleAttempt(CompletableFuture vessel, HttpRequest request, RetriableFuture result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { + result.set(response, thrown); // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { - CircuitBreaker.State state = breaker.state(); CompletableFuture retry = new CompletableFuture<>(); offer(request, retry); - handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1)); } // ... or accept the outcome and mark the operation as complete. - else { - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - } + else result.complete(); }, resultExecutor); } @Override public void destroy() { - if ( ! destroyed.getAndSet(true)) { - inflightById.values().forEach(result -> result.cancel(true)); // TODO: More informative exception. + if (destroyed.compareAndSet(false, true)) { + inflightById.values().forEach(RetriableFuture::complete); cluster.close(); resultExecutor.shutdown(); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java index 6b2087b1828..02175150fed 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +28,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -47,11 +49,8 @@ class HttpRequestStrategyTest { CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { - while ( ! latch.await(1, TimeUnit.SECONDS)) { + while ( ! latch.await(1, TimeUnit.SECONDS)) System.err.println(cluster.stats().inflight()); - System.err.println(strategy.throttler.targetInflight()); - System.err.println(); - } } catch (InterruptedException ignored) { } }).start(); @@ -78,7 +77,7 @@ class HttpRequestStrategyTest { } @Test - void testLogic() throws ExecutionException, InterruptedException { + void testRetries() throws ExecutionException, InterruptedException { int minStreams = 16; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); @@ -98,10 +97,11 @@ class HttpRequestStrategyTest { HttpRequest request = new HttpRequest("POST", "/", null, null); // Runtime exception is not retried. - cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom"))); + cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertEquals("boom", expected.getCause().getMessage()); + assertTrue(expected.getCause() instanceof FeedException); + assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. @@ -178,6 +178,81 @@ class HttpRequestStrategyTest { assertEquals(3, strategy.stats().exceptions()); } + @Test + void testShutdown() { + MockCluster cluster = new MockCluster(); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + new BenchmarkingCluster(cluster)); + + DocumentId id1 = DocumentId.of("ns", "type", "1"); + DocumentId id2 = DocumentId.of("ns", "type", "2"); + DocumentId id3 = DocumentId.of("ns", "type", "3"); + DocumentId id4 = DocumentId.of("ns", "type", "4"); + HttpRequest failing = new HttpRequest("POST", "/", null, null); + HttpRequest request = new HttpRequest("POST", "/", null, null); + HttpRequest blocking = new HttpRequest("POST", "/", null, null); + + // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. + Phaser phaser = new Phaser(2); + Phaser blocker = new Phaser(2); + AtomicReference> completion = new AtomicReference<>(); + cluster.expect((req, vessel) -> { + if (req == blocking) { + phaser.arriveAndAwaitAdvance(); // Synchronise with tst main thread, and then ... + blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue. + throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything. + } + else if (req == failing) { + phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this. + vessel.completeExceptionally(new IOException("failed")); + } + else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this. + }); + CompletableFuture inflight = strategy.enqueue(id1, request); + CompletableFuture serialised1 = strategy.enqueue(id1, request); + CompletableFuture serialised2 = strategy.enqueue(id1, request); + CompletableFuture failed = strategy.enqueue(id2, failing); + CompletableFuture blocked = strategy.enqueue(id3, blocking); + CompletableFuture delayed = strategy.enqueue(id4, request); + phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response. + phaser.arriveAndAwaitAdvance(); // failed completes dispatch, and a retry is enqueued. + phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. + + // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it; + // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch; + // failed has a partial result, and has a retry in the dispatch queue. + assertFalse(inflight.isDone()); + assertFalse(serialised1.isDone()); + assertFalse(serialised2.isDone()); + assertFalse(failed.isDone()); + assertFalse(blocked.isDone()); + assertFalse(delayed.isDone()); + + // Kill dispatch thread, and see that all enqueued operations, and new ones, complete. + blocker.arriveAndAwaitAdvance(); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, inflight::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised1::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, serialised2::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, blocked::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, delayed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", + assertThrows(ExecutionException.class, failed::get).getMessage()); + assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", + assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); + } + static class MockCluster implements Cluster { final AtomicReference>> dispatch = new AtomicReference<>(); -- cgit v1.2.3