aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-06-25 14:16:17 +0200
committerGitHub <noreply@github.com>2021-06-25 14:16:17 +0200
commitb03e7869d363e46f1ff19a0e5365f830c638dc5d (patch)
treeb542ea09958b0295f5304a2242a1a838c9a6a932 /vespa-feed-client
parent5896921f79fa9cf5dd5ea6ad6c4b78e392ca43e5 (diff)
parent11a6745c68f2ae199bedd7aa06a13ea52e3dc8c1 (diff)
Merge pull request #18408 from vespa-engine/jonmv/vespa-feed-client
Jonmv/vespa feed client
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/abi-spec.json3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java29
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java97
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java3
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java34
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java90
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java8
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java2
9 files changed, 192 insertions, 80 deletions
diff --git a/vespa-feed-client/abi-spec.json b/vespa-feed-client/abi-spec.json
index faf189f1008..5a473e05508 100644
--- a/vespa-feed-client/abi-spec.json
+++ b/vespa-feed-client/abi-spec.json
@@ -334,8 +334,7 @@
],
"fields": [
"public static final enum ai.vespa.feed.client.Result$Type success",
- "public static final enum ai.vespa.feed.client.Result$Type conditionNotMet",
- "public static final enum ai.vespa.feed.client.Result$Type failure"
+ "public static final enum ai.vespa.feed.client.Result$Type conditionNotMet"
]
},
"ai.vespa.feed.client.Result": {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
index 2c5c2dccf19..c319bfca252 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
@@ -8,7 +8,7 @@ import java.util.function.LongSupplier;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
/**
@@ -48,7 +48,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
public void success() {
failingSinceMillis.set(NEVER);
if ( ! open.get() && halfOpen.compareAndSet(true, false))
- log.log(INFO, "Circuit breaker is now closed");
+ log.log(FINE, "Circuit breaker is now closed");
}
@Override
@@ -60,7 +60,7 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
public State state() {
long failingMillis = clock.getAsLong() - failingSinceMillis.get();
if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
- log.log(INFO, "Circuit breaker is now half-open");
+ log.log(FINE, "Circuit breaker is now half-open");
if (failingMillis > doomMillis && open.compareAndSet(false, true))
log.log(WARNING, "Circuit breaker is now open");
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 2269c56cde4..90b5707c8a0 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -85,15 +85,25 @@ class HttpFeedClient implements FeedClient {
.thenApply(response -> toResult(request, response, documentId));
}
+ private enum Outcome { success, conditionNotMet, vespaFailure, transportFailure };
+
+ static Result.Type toResultType(Outcome outcome) {
+ switch (outcome) {
+ case success: return Result.Type.success;
+ case conditionNotMet: return Result.Type.conditionNotMet;
+ default: throw new IllegalArgumentException("No corresponding result type for '" + outcome + "'");
+ }
+ }
+
static Result toResult(HttpRequest request, HttpResponse response, DocumentId documentId) {
- Result.Type type;
+ Outcome outcome;
switch (response.code()) {
- case 200: type = Result.Type.success; break;
- case 412: type = Result.Type.conditionNotMet; break;
+ case 200: outcome = Outcome.success; break;
+ case 412: outcome = Outcome.conditionNotMet; break;
case 502:
case 504:
- case 507: type = Result.Type.failure; break;
- default: type = null;
+ case 507: outcome = Outcome.vespaFailure; break;
+ default: outcome = Outcome.transportFailure;
}
String message = null;
@@ -125,13 +135,16 @@ class HttpFeedClient implements FeedClient {
throw new ResultParseException(documentId, e);
}
- if (type == null) // Not a Vespa response, but a failure in the HTTP layer.
- throw new ResultParseException(
+ if (outcome == Outcome.transportFailure) // Not a Vespa response, but a failure in the HTTP layer.
+ throw new FeedException(
documentId,
"Status " + response.code() + " executing '" + request + "': "
+ (message == null ? new String(response.body(), UTF_8) : message));
- return new Result(type, documentId, message, trace);
+ if (outcome == Outcome.vespaFailure)
+ throw new ResultException(documentId, message, trace);
+
+ return new Result(toResultType(outcome), documentId, message, trace);
}
static String getPath(DocumentId documentId) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index ddfc509738f..2480c9a7367 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -24,18 +25,15 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
-import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
-// TODO: update doc
/**
- * Controls request execution and retries:
- * <ul>
- * <li>Whenever throttled (429, 503), set target inflight to 0.9 * current, and retry over a different connection;</li>
- * <li>retry other transient errors (500, 502 and IOException) a specified number of times, for specified operation types;</li>
- * <li>and on every successful response, increase target inflight by 0.1.</li>
- * </ul>
+ * Controls request execution and retries.
+ *
+ * This class has all control flow for throttling and dispatching HTTP requests to an injected
+ * HTTP cluster, including error handling and retries, and a circuit breaker mechanism.
*
* @author jonmv
*/
@@ -44,10 +42,10 @@ class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Cluster cluster;
- private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>();
+ private final Map<DocumentId, RetriableFuture<HttpResponse>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
- final FeedClient.Throttler throttler;
+ private final FeedClient.Throttler throttler;
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final AtomicLong inflight = new AtomicLong(0);
private final AtomicBoolean destroyed = new AtomicBoolean(false);
@@ -88,21 +86,50 @@ class HttpRequestStrategy implements RequestStrategy {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
- Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open?
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
}
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.log(WARNING, "Dispatch thread interrupted; shutting down");
+ catch (Throwable t) {
+ log.log(WARNING, "Dispatch thread threw; shutting down", t);
}
destroy();
}
private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
delayedCount.incrementAndGet();
- queue.offer(() -> {
- cluster.dispatch(request, vessel);
- });
+ queue.offer(() -> cluster.dispatch(request, vessel));
+ }
+
+
+ /** A completable future which stores a temporary failure result to return upon abortion. */
+ private static class RetriableFuture<T> extends CompletableFuture<T> {
+
+ private final AtomicReference<Runnable> completion = new AtomicReference<>();
+ private final AtomicReference<RetriableFuture<T>> dependency = new AtomicReference<>();
+
+ private RetriableFuture() {
+ completion.set(() -> completeExceptionally(new FeedException("Operation aborted")));
+ }
+
+ /** Complete now with the last result or error. */
+ void complete() {
+ completion.get().run();
+ RetriableFuture<T> toComplete = dependency.getAndSet(null);
+ if (toComplete != null) toComplete.complete();
+ }
+
+ /** Ensures the dependency is completed whenever this is. */
+ void dependOn(RetriableFuture<T> dependency) {
+ this.dependency.set(dependency);
+ if (isDone()) dependency.complete();
+ }
+
+ /** Set the result of the last attempt at completing the computation represented by this. */
+ void set(T result, Throwable thrown) {
+ completion.set(thrown != null ? () -> completeExceptionally(thrown)
+ : () -> complete(result));
+ }
+
}
private boolean poll() {
@@ -139,11 +166,11 @@ class HttpRequestStrategy implements RequestStrategy {
if ( (thrown instanceof IOException) // General IO problems.
|| (thrown instanceof CancellationException) // TLS session disconnect.
|| (thrown instanceof CancelledKeyException)) { // Selection cancelled.
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request);
+ log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request);
return retry(request, attempt);
}
- log.log(WARNING, thrown, () -> "Failed attempt " + attempt + " at " + request);
+ log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
return false;
}
@@ -158,18 +185,17 @@ class HttpRequestStrategy implements RequestStrategy {
if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
- logResponse(FINE, response, request, attempt);
+ logResponse(FINER, response, request, attempt);
throttler.throttled((inflight.get() - delayedCount.get()));
return true;
}
breaker.failure();
+ logResponse(FINE, response, request, attempt);
if (response.code() == 500 || response.code() == 502 || response.code() == 504) { // Hopefully temporary errors.
- logResponse(INFO, response, request, attempt);
return retry(request, attempt);
}
- logResponse(WARNING, response, request, attempt);
return false;
}
@@ -208,11 +234,11 @@ class HttpRequestStrategy implements RequestStrategy {
@Override
public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) {
- CompletableFuture<HttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
+ RetriableFuture<HttpResponse> result = new RetriableFuture<>(); // Carries the aggregate result of the operation, including retries.
CompletableFuture<HttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
- CompletableFuture<?> previous = inflightById.put(documentId, result);
+ RetriableFuture<HttpResponse> previous = inflightById.put(documentId, result);
if (destroyed.get()) {
- result.cancel(true);
+ result.complete();
return result;
}
@@ -221,13 +247,15 @@ class HttpRequestStrategy implements RequestStrategy {
offer(request, vessel);
throttler.sent(inflight.get(), result);
}
- else
+ else {
+ result.dependOn(previous); // In case result is aborted, also abort the previous if still inflight.
previous.whenComplete((__, ___) -> offer(request, vessel));
+ }
handleAttempt(vessel, request, result, 1);
return result.handle((response, error) -> {
- if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null)
+ if (inflightById.compute(documentId, (__, current) -> current == result ? null : current) == null)
releaseSlot();
if (error != null) {
@@ -239,29 +267,26 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, CompletableFuture<HttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
+ result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
- CircuitBreaker.State state = breaker.state();
CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
offer(request, retry);
- handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
+ handleAttempt(retry, request, result, attempt + (breaker.state() == HALF_OPEN ? 0 : 1));
}
// ... or accept the outcome and mark the operation as complete.
- else {
- if (thrown == null) result.complete(response);
- else result.completeExceptionally(thrown);
- }
+ else result.complete();
},
resultExecutor);
}
@Override
public void destroy() {
- if ( ! destroyed.getAndSet(true)) {
- inflightById.values().forEach(result -> result.cancel(true)); // TODO: More informative exception.
+ if (destroyed.compareAndSet(false, true)) {
+ inflightById.values().forEach(RetriableFuture::complete);
cluster.close();
resultExecutor.shutdown();
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
index 0a036c6c1b0..7be7aadc188 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
@@ -25,8 +25,7 @@ public class Result {
public enum Type {
success,
- conditionNotMet,
- failure
+ conditionNotMet
}
public Type type() { return type; }
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
index d8090549420..6aa0de2160c 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java
@@ -13,6 +13,7 @@ import java.util.function.BiFunction;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* @author jonmv
@@ -55,16 +56,19 @@ class HttpFeedClientTest {
return failed;
}
});
- Result result = client.put(id,
- "json",
- OperationParameters.empty()
- .createIfNonExistent(true)
- .testAndSetCondition("false")
- .route("route")
- .timeout(Duration.ofSeconds(5)))
- .get();
- assertEquals("Ooops! ... I did it again.", result.resultMessage().get());
- assertEquals("I played with your heart. Got lost in the game.", result.traceMessage().get());
+ ExecutionException expected = assertThrows(ExecutionException.class,
+ () -> client.put(id,
+ "json",
+ OperationParameters.empty()
+ .createIfNonExistent(true)
+ .testAndSetCondition("false")
+ .route("route")
+ .timeout(Duration.ofSeconds(5)))
+ .get());
+ assertTrue(expected.getCause() instanceof ResultException);
+ ResultException result = (ResultException) expected.getCause();
+ assertEquals("Ooops! ... I did it again.", result.getMessage());
+ assertEquals("I played with your heart. Got lost in the game.", result.getTrace().get());
// Handler error is a FeedException.
@@ -90,11 +94,11 @@ class HttpFeedClientTest {
return failed;
}
});
- ExecutionException expected = assertThrows(ExecutionException.class,
- () -> client.put(id,
- "json",
- OperationParameters.empty())
- .get());
+ expected = assertThrows(ExecutionException.class,
+ () -> client.put(id,
+ "json",
+ OperationParameters.empty())
+ .get());
assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 21ab6889e6e..02175150fed 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -15,6 +15,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +28,9 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@@ -46,11 +49,8 @@ class HttpRequestStrategyTest {
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
- while ( ! latch.await(1, TimeUnit.SECONDS)) {
+ while ( ! latch.await(1, TimeUnit.SECONDS))
System.err.println(cluster.stats().inflight());
- System.err.println(strategy.throttler.targetInflight());
- System.err.println();
- }
}
catch (InterruptedException ignored) { }
}).start();
@@ -77,7 +77,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testLogic() throws ExecutionException, InterruptedException {
+ void testRetries() throws ExecutionException, InterruptedException {
int minStreams = 16; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
@@ -97,10 +97,11 @@ class HttpRequestStrategyTest {
HttpRequest request = new HttpRequest("POST", "/", null, null);
// Runtime exception is not retried.
- cluster.expect((__, vessel) -> vessel.completeExceptionally(new FeedException("boom")));
+ cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertEquals("boom", expected.getCause().getMessage());
+ assertTrue(expected.getCause() instanceof FeedException);
+ assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
// IOException is retried.
@@ -177,6 +178,81 @@ class HttpRequestStrategyTest {
assertEquals(3, strategy.stats().exceptions());
}
+ @Test
+ void testShutdown() {
+ MockCluster cluster = new MockCluster();
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ new BenchmarkingCluster(cluster));
+
+ DocumentId id1 = DocumentId.of("ns", "type", "1");
+ DocumentId id2 = DocumentId.of("ns", "type", "2");
+ DocumentId id3 = DocumentId.of("ns", "type", "3");
+ DocumentId id4 = DocumentId.of("ns", "type", "4");
+ HttpRequest failing = new HttpRequest("POST", "/", null, null);
+ HttpRequest request = new HttpRequest("POST", "/", null, null);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null);
+
+ // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
+ Phaser phaser = new Phaser(2);
+ Phaser blocker = new Phaser(2);
+ AtomicReference<CompletableFuture<HttpResponse>> completion = new AtomicReference<>();
+ cluster.expect((req, vessel) -> {
+ if (req == blocking) {
+ phaser.arriveAndAwaitAdvance(); // Synchronise with tst main thread, and then ...
+ blocker.arriveAndAwaitAdvance(); // ... block dispatch thread, so we get something in the queue.
+ throw new RuntimeException("armageddon"); // Dispatch thread should die, tearing down everything.
+ }
+ else if (req == failing) {
+ phaser.arriveAndAwaitAdvance(); // Let test thread enqueue more ops before failing (and retrying) this.
+ vessel.completeExceptionally(new IOException("failed"));
+ }
+ else phaser.arriveAndAwaitAdvance(); // Don't complete from mock cluster, but require destruction to do this.
+ });
+ CompletableFuture<HttpResponse> inflight = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised1 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> serialised2 = strategy.enqueue(id1, request);
+ CompletableFuture<HttpResponse> failed = strategy.enqueue(id2, failing);
+ CompletableFuture<HttpResponse> blocked = strategy.enqueue(id3, blocking);
+ CompletableFuture<HttpResponse> delayed = strategy.enqueue(id4, request);
+ phaser.arriveAndAwaitAdvance(); // inflight completes dispatch, but causes no response.
+ phaser.arriveAndAwaitAdvance(); // failed completes dispatch, and a retry is enqueued.
+ phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
+
+ // Current state: inflight is "inflight to cluster", serialised1/2 are waiting completion of it;
+ // blocked is blocking dispatch, delayed is enqueued, waiting for dispatch;
+ // failed has a partial result, and has a retry in the dispatch queue.
+ assertFalse(inflight.isDone());
+ assertFalse(serialised1.isDone());
+ assertFalse(serialised2.isDone());
+ assertFalse(failed.isDone());
+ assertFalse(blocked.isDone());
+ assertFalse(delayed.isDone());
+
+ // Kill dispatch thread, and see that all enqueued operations, and new ones, complete.
+ blocker.arriveAndAwaitAdvance();
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, inflight::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised1::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, serialised2::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, blocked::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, delayed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
+ assertThrows(ExecutionException.class, failed::get).getMessage());
+ assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
+ assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());
+ }
+
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
index 1e616f2625a..3b633c38132 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
@@ -38,17 +38,15 @@ class JsonFileFeederExample implements Closeable {
resultsReceived.incrementAndGet();
if (error != null) {
log.warning("Problems with feeding document "
- + error.documentId().map(DocumentId::toString).orElse("<unknown>"));
- errorsReceived.incrementAndGet();
- } else if (result.type() == Result.Type.failure) {
- log.warning("Problems with docID " + result.documentId() + ":" + error);
+ + error.documentId().map(DocumentId::toString).orElse("<unknown>")
+ + ": " + error);
errorsReceived.incrementAndGet();
}
}
@Override
public void onError(FeedException error) {
- log.severe("Feeding failed for d: " + error.getMessage());
+ log.severe("Feeding failed fatally: " + error.getMessage());
}
@Override
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
index 5cee776b244..cbe0e213907 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -100,8 +100,6 @@ class JsonStreamFeederExample extends Thread implements AutoCloseable {
if (throwable != null) {
System.err.printf("Failure for '%s': %s", docId, throwable);
throwable.printStackTrace();
- } else if (result.type() == Result.Type.failure) {
- System.err.printf("Failure for '%s': %s", docId, result.resultMessage().orElse("<no messsage>"));
}
});
} catch (InterruptedException e) {