summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-12-16 17:24:15 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-12-16 17:24:15 +0100
commit2f68e3741a0ec2317bd2b2a39e0eda00ce6ef281 (patch)
tree1031cdd3a75d8029fd9380222dee0c81aa4dec7c /vespa-feed-client-api
parent88932e068abbbcbd7b78bb4c5d75b722569fde6a (diff)
Add helper method for synchronous use of feed client
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r--vespa-feed-client-api/abi-spec.json15
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java19
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java42
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java38
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java105
5 files changed, 219 insertions, 0 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json
index a9047365a7a..8af7798984f 100644
--- a/vespa-feed-client-api/abi-spec.json
+++ b/vespa-feed-client-api/abi-spec.json
@@ -103,6 +103,8 @@
"public abstract java.util.concurrent.CompletableFuture put(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
"public abstract java.util.concurrent.CompletableFuture update(ai.vespa.feed.client.DocumentId, java.lang.String, ai.vespa.feed.client.OperationParameters)",
"public abstract java.util.concurrent.CompletableFuture remove(ai.vespa.feed.client.DocumentId, ai.vespa.feed.client.OperationParameters)",
+ "public static java.util.List await(java.util.List)",
+ "public static varargs java.util.List await(java.util.concurrent.CompletableFuture[])",
"public abstract ai.vespa.feed.client.OperationStats stats()",
"public abstract ai.vespa.feed.client.FeedClient$CircuitBreaker$State circuitBreakerState()",
"public abstract void close(boolean)",
@@ -221,6 +223,19 @@
],
"fields": []
},
+ "ai.vespa.feed.client.MultiFeedException": {
+ "superClass": "java.lang.RuntimeException",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public void <init>(java.util.Collection)",
+ "public java.util.Collection feedExceptions()",
+ "public java.util.Set documentIds()"
+ ],
+ "fields": []
+ },
"ai.vespa.feed.client.OperationParameters": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
index d463c611d6a..5e95990a078 100644
--- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -2,6 +2,7 @@
package ai.vespa.feed.client;
import java.io.Closeable;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
@@ -37,6 +38,24 @@ public interface FeedClient extends Closeable {
*/
CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+ /**
+ * Waits for all feed operations to complete, either successfully or with exception.
+ * @throws MultiFeedException if any operation fails
+ * @return list of results with the same ordering as the {@code promises} parameter
+ * */
+ static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException {
+ return Helper.await(promises);
+ }
+
+ /**
+ * Same as {@link #await(List)} except {@code promises} parameter is a vararg
+ * @see #await(List)
+ */
+ @SafeVarargs
+ static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException {
+ return Helper.await(promises);
+ }
+
/** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
OperationStats stats();
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java
new file mode 100644
index 00000000000..59c12077bef
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Helper.java
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.stream.Collectors;
+
+/**
+ * @author bjorncs
+ */
+class Helper {
+
+ private Helper() {}
+
+ @SafeVarargs
+ static List<Result> await(CompletableFuture<Result>... promises) throws MultiFeedException {
+ List<CompletableFuture<Result>> list = new ArrayList<>();
+ for (CompletableFuture<Result> p : promises) list.add(p);
+ return await(list);
+ }
+
+ static List<Result> await(List<CompletableFuture<Result>> promises) throws MultiFeedException {
+ try {
+ CompletableFuture.allOf(promises.toArray(new CompletableFuture<?>[0])).join();
+ return promises.stream()
+ .map(p -> Objects.requireNonNull(p.getNow(null)))
+ .collect(Collectors.toList());
+ } catch (CompletionException e) {
+ List<FeedException> exceptions = new ArrayList<>();
+ for (CompletableFuture<Result> promise : promises) {
+ if (promise.isCompletedExceptionally()) {
+ // Lambda is executed on this thread since the future is already completed
+ promise.whenComplete((__, error) -> exceptions.add((FeedException) error));
+ }
+ }
+ throw new MultiFeedException(exceptions);
+ }
+ }
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java
new file mode 100644
index 00000000000..5db687b49ff
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/MultiFeedException.java
@@ -0,0 +1,38 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Aggregates multiple instances of {@link FeedException}
+ *
+ * @author bjorncs
+ */
+public class MultiFeedException extends RuntimeException {
+
+ private final List<FeedException> exceptions;
+
+ public MultiFeedException(Collection<FeedException> exceptions) {
+ super(toMessage(exceptions));
+ this.exceptions = Collections.unmodifiableList(new ArrayList<>(exceptions));
+ }
+
+ public Collection<FeedException> feedExceptions() { return exceptions; }
+
+ public Set<DocumentId> documentIds() {
+ return exceptions.stream()
+ .filter(e -> e.documentId().isPresent())
+ .map(e -> e.documentId().get())
+ .collect(Collectors.toSet());
+ }
+
+ private static String toMessage(Collection<FeedException> exceptions) {
+ return String.format("%d feed operations failed", exceptions.size());
+ }
+
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java
new file mode 100644
index 00000000000..688f311bb05
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/FeedClientTest.java
@@ -0,0 +1,105 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * @author bjorncs
+ */
+class FeedClientTest {
+
+ private ExecutorService executor;
+
+ @BeforeEach
+ void setUp() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @AfterEach
+ void tearDown() throws InterruptedException {
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(60, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void await_returns_list_of_result_on_success() {
+ MyResult r1 = new MyResult();
+ CompletableFuture<Result> f1 = CompletableFuture.completedFuture(r1);
+ MyResult r2 = new MyResult();
+ CompletableFuture<Result> f2 = CompletableFuture.completedFuture(r2);
+ MyResult r3 = new MyResult();
+ CompletableFuture<Result> f3 = CompletableFuture.completedFuture(r3);
+
+ List<Result> aggregated = FeedClient.await(f1, f2, f3);
+ assertEquals(3, aggregated.size());
+ assertEquals(r1, aggregated.get(0));
+ assertEquals(r2, aggregated.get(1));
+ assertEquals(r3, aggregated.get(2));
+ }
+
+ @Test
+ void await_handles_async_completion_with_success() throws ExecutionException, InterruptedException {
+ CompletableFuture<Result> f1 = new CompletableFuture<>();
+ CompletableFuture<Result> f2 = new CompletableFuture<>();
+ CompletableFuture<Result> f3 = new CompletableFuture<>();
+
+ CompletableFuture<List<Result>> awaitPromise = CompletableFuture.supplyAsync(() -> FeedClient.await(f1, f2, f3), executor);
+ // Completed in reverse order
+ MyResult r3 = new MyResult();
+ f3.complete(r3);
+ MyResult r2 = new MyResult();
+ f2.complete(r2);
+ MyResult r1 = new MyResult();
+ f1.complete(r1);
+
+ List<Result> aggregated = awaitPromise.get();
+ assertEquals(3, aggregated.size());
+ assertEquals(r1, aggregated.get(0));
+ assertEquals(r2, aggregated.get(1));
+ assertEquals(r3, aggregated.get(2));
+ }
+
+ @Test
+ void await_throws_when_some_results_completes_exceptionally() {
+ CompletableFuture<Result> f1 = new CompletableFuture<>();
+ DocumentId docId1 = DocumentId.of("music", "music", "doc1");
+ FeedException exceptionDoc1 = new FeedException(docId1, "Doc1 failed");
+ f1.completeExceptionally(exceptionDoc1);
+ CompletableFuture<Result> f2 = new CompletableFuture<>();
+ DocumentId docId2 = DocumentId.of("music", "music", "doc2");
+ FeedException exceptionDoc2 = new FeedException(docId2, "Doc2 failed");
+ f2.completeExceptionally(exceptionDoc2);
+ CompletableFuture<Result> f3 = CompletableFuture.completedFuture(new MyResult());
+
+ MultiFeedException multiException = assertThrows(MultiFeedException.class, () -> FeedClient.await(f1, f2, f3));
+ Set<DocumentId> expectedDocsIds = new HashSet<>(Arrays.asList(docId1, docId2));
+ assertEquals(expectedDocsIds, new HashSet<>(multiException.documentIds()));
+ Set<FeedException> expectedExceptions = new HashSet<>(Arrays.asList(exceptionDoc1, exceptionDoc2));
+ assertEquals(expectedExceptions, new HashSet<>(multiException.feedExceptions()));
+ assertEquals("2 feed operations failed", multiException.getMessage());
+ }
+
+ static class MyResult implements Result {
+ @Override public Type type() { return null; }
+ @Override public DocumentId documentId() { return null; }
+ @Override public Optional<String> resultMessage() { return Optional.empty(); }
+ @Override public Optional<String> traceMessage() { return Optional.empty(); }
+ }
+}