diff options
5 files changed, 219 insertions, 0 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json index a9047365a7a..8af7798984f 100644 --- a/vespa-feed-client-api/abi-spec.json +++ b/vespa-feed-client-api/abi-spec.json @@ -103,6 +103,8 @@ "public abstract java.util.concurrent.CompletableFuture put(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)", "public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)", "public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)", + "public static java.util.List await(java.util.List)", + "public static varargs java.util.List await(java.util.concurrent.CompletableFuture[])", "public abstract ai.vespa.feed.client.OperationStats stats()", "public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()", "public abstract void close(boolean)", @@ -221,6 +223,19 @@ ], "fields": [] }, + "ai.vespa.feed.client.MultiFeedException": { + "superClass": "java.lang.RuntimeException", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public void <init>(java.util.Collection)", + "public java.util.Collection feedExceptions()", + "public java.util.Set documentIds()" + ], + "fields": [] + }, "ai.vespa.feed.client.OperationParameters": { "superClass": "java.lang.Object", "interfaces": [], diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java index d463c611d6a..5e95990a078 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -2,6 +2,7 @@ package ai.vespa.feed.client; import java.io.Closeable; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -37,6 +38,24 @@ public interface FeedClient extends Closeable { */ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); + /** + * Waits for all feed operations to complete, either successfully or with exception. + * @throws MultiFeedException if any operation fails + * @return list of results with the same ordering as the {@code promises} parameter + * */ + static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException { + return Helper.await(promises); + } + + /** + * Same as {@link #await(List)} except {@code promises} parameter is a vararg + * @see #await(List) + */ + @SafeVarargs + static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException { + return Helper.await(promises); + } + /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ OperationStats stats(); diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java new file mode 100644 index 00000000000..59c12077bef --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java @@ -0,0 +1,42 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +/** + * @author bjorncs + */ +class Helper { + + private Helper() {} + + @SafeVarargs + static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException { + List<CompletableFuture<Result>> list = new ArrayList<>(); + for (CompletableFuture<Result> p : promises) list.add(p); + return await(list); + } + + static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException { + try { + CompletableFuture.allOf(promises.toArray(new CompletableFuture<?>[0])).join(); + return promises.stream() + .map(p -> Objects.requireNonNull(p.getNow(null))) + .collect(Collectors.toList()); + } catch (CompletionException e) { + List<FeedException> exceptions = new ArrayList<>(); + for (CompletableFuture<Result> promise : promises) { + if (promise.isCompletedExceptionally()) { + // Lambda is executed on this thread since the future is already completed + promise.whenComplete((__, error) -> exceptions.add((FeedException) error)); + } + } + throw new MultiFeedException(exceptions); + } + } +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java new file mode 100644 index 00000000000..5db687b49ff --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java @@ -0,0 +1,38 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Aggregates multiple instances of {@link FeedException} + * + * @author bjorncs + */ +public class MultiFeedException extends RuntimeException { + + private final List<FeedException> exceptions; + + public MultiFeedException(Collection<FeedException> exceptions) { + super(toMessage(exceptions)); + this.exceptions = Collections.unmodifiableList(new ArrayList<>(exceptions)); + } + + public Collection<FeedException> feedExceptions() { return exceptions; } + + public Set<DocumentId> documentIds() { + return exceptions.stream() + .filter(e -> e.documentId().isPresent()) + .map(e -> e.documentId().get()) + .collect(Collectors.toSet()); + } + + private static String toMessage(Collection<FeedException> exceptions) { + return String.format("%d feed operations failed", exceptions.size()); + } + +} diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java new file mode 100644 index 00000000000..688f311bb05 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java @@ -0,0 +1,105 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * @author bjorncs + */ +class FeedClientTest { + + private ExecutorService executor; + + @BeforeEach + void setUp() { + executor = Executors.newSingleThreadExecutor(); + } + + @AfterEach + void tearDown() throws InterruptedException { + executor.shutdown(); + assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS)); + } + + @Test + void await_returns_list_of_result_on_success() { + MyResult r1 = new MyResult(); + CompletableFuture<Result> f1 = CompletableFuture.completedFuture(r1); + MyResult r2 = new MyResult(); + CompletableFuture<Result> f2 = CompletableFuture.completedFuture(r2); + MyResult r3 = new MyResult(); + CompletableFuture<Result> f3 = CompletableFuture.completedFuture(r3); + + List<Result> aggregated = FeedClient.await(f1, f2, f3); + assertEquals(3, aggregated.size()); + assertEquals(r1, aggregated.get(0)); + assertEquals(r2, aggregated.get(1)); + assertEquals(r3, aggregated.get(2)); + } + + @Test + void await_handles_async_completion_with_success() throws ExecutionException, InterruptedException { + CompletableFuture<Result> f1 = new CompletableFuture<>(); + CompletableFuture<Result> f2 = new CompletableFuture<>(); + CompletableFuture<Result> f3 = new CompletableFuture<>(); + + CompletableFuture<List<Result>> awaitPromise = CompletableFuture.supplyAsync(() -> FeedClient.await(f1, f2, f3), executor); + // Completed in reverse order + MyResult r3 = new MyResult(); + f3.complete(r3); + MyResult r2 = new MyResult(); + f2.complete(r2); + MyResult r1 = new MyResult(); + f1.complete(r1); + + List<Result> aggregated = awaitPromise.get(); + assertEquals(3, aggregated.size()); + assertEquals(r1, aggregated.get(0)); + assertEquals(r2, aggregated.get(1)); + assertEquals(r3, aggregated.get(2)); + } + + @Test + void await_throws_when_some_results_completes_exceptionally() { + CompletableFuture<Result> f1 = new CompletableFuture<>(); + DocumentId docId1 = DocumentId.of("music", "music", "doc1"); + FeedException exceptionDoc1 = new FeedException(docId1, "Doc1 failed"); + f1.completeExceptionally(exceptionDoc1); + CompletableFuture<Result> f2 = new CompletableFuture<>(); + DocumentId docId2 = DocumentId.of("music", "music", "doc2"); + FeedException exceptionDoc2 = new FeedException(docId2, "Doc2 failed"); + f2.completeExceptionally(exceptionDoc2); + CompletableFuture<Result> f3 = CompletableFuture.completedFuture(new MyResult()); + + MultiFeedException multiException = assertThrows(MultiFeedException.class, () -> FeedClient.await(f1, f2, f3)); + Set<DocumentId> expectedDocsIds = new HashSet<>(Arrays.asList(docId1, docId2)); + assertEquals(expectedDocsIds, new HashSet<>(multiException.documentIds())); + Set<FeedException> expectedExceptions = new HashSet<>(Arrays.asList(exceptionDoc1, exceptionDoc2)); + assertEquals(expectedExceptions, new HashSet<>(multiException.feedExceptions())); + assertEquals("2 feed operations failed", multiException.getMessage()); + } + + static class MyResult implements Result { + @Override public Type type() { return null; } + @Override public DocumentId documentId() { return null; } + @Override public Optional<String> resultMessage() { return Optional.empty(); } + @Override public Optional<String> traceMessage() { return Optional.empty(); } + } +} |