summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-12-01 16:57:23 +0100
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-12-01 17:49:22 +0100
commitf2599280846073b144c4065bf25478138cc38b67 (patch)
treedb67a6227e93f8f7057d0f6bdc3fcb2ce2d4cac5 /vespajlib
parent14bf2d67787ac11c375117c4271b2ab783ae5612 (diff)
Add helper for combining multiple completable futures
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/pom.xml5
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java67
-rw-r--r--vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java66
3 files changed, 138 insertions, 0 deletions
diff --git a/vespajlib/pom.xml b/vespajlib/pom.xml
index 302a3c6f5bf..68639d30ab2 100644
--- a/vespajlib/pom.xml
+++ b/vespajlib/pom.xml
@@ -78,6 +78,11 @@
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java b/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java
new file mode 100644
index 00000000000..b1fa6a9438d
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/CompletableFutures.java
@@ -0,0 +1,67 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Helper for {@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}.
+ *
+ * @author bjorncs
+ */
+public class CompletableFutures {
+
+ private CompletableFutures() {}
+
+ /**
+ * Returns a new completable future that is either
+ * - completed when any of the provided futures complete without exception
+ * - completed exceptionally once all provided futures complete exceptionally
+ */
+ public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures) {
+ class Combiner {
+ final Object monitor = new Object();
+ final CompletableFuture<T> combined = new CompletableFuture<>();
+ final int futuresCount;
+
+ Throwable error = null;
+ int exceptionCount = 0;
+
+ Combiner(int futuresCount) { this.futuresCount = futuresCount; }
+
+ void onCompletion(T value, Throwable error) {
+ if (combined.isDone()) return;
+ T valueToComplete = null;
+ Throwable exceptionToComplete = null;
+
+ synchronized (monitor) {
+ if (value != null) {
+ valueToComplete = value;
+ } else {
+ if (this.error == null) {
+ this.error = error;
+ } else {
+ this.error.addSuppressed(error);
+ }
+ if (++exceptionCount == futuresCount) {
+ exceptionToComplete = this.error;
+ }
+ }
+ }
+ if (valueToComplete != null) {
+ combined.complete(value);
+ } else if (exceptionToComplete != null) {
+ combined.completeExceptionally(exceptionToComplete);
+ }
+ }
+ }
+
+ int size = futures.size();
+ if (size == 0) throw new IllegalArgumentException();
+ if (size == 1) return futures.get(0);
+ Combiner combiner = new Combiner(size);
+ futures.forEach(future -> future.whenComplete(combiner::onCompletion));
+ return combiner.combined;
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java
new file mode 100644
index 00000000000..cf9c36537d9
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/concurrent/CompletableFuturesTest.java
@@ -0,0 +1,66 @@
+package com.yahoo.concurrent;// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * @author bjorncs
+ */
+class CompletableFuturesTest {
+
+ @Test
+ public void firstof_completes_when_first_futures_has_completed() {
+ CompletableFuture<String> f1 = new CompletableFuture<>();
+ CompletableFuture<String> f2 = new CompletableFuture<>();
+ CompletableFuture<String> f3 = new CompletableFuture<>();
+ CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3));
+ f1.complete("success");
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+ assertEquals("success", result.join());
+ }
+
+ @Test
+ public void firstof_completes_if_any_futures_completes() {
+ CompletableFuture<String> f1 = new CompletableFuture<>();
+ CompletableFuture<String> f2 = new CompletableFuture<>();
+ CompletableFuture<String> f3 = new CompletableFuture<>();
+ CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3));
+ f1.completeExceptionally(new Throwable("t1"));
+ f2.completeExceptionally(new Throwable("t2"));
+ f3.complete("success");
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+ assertEquals("success", result.join());
+ }
+
+ @Test
+ public void firstof_completes_exceptionally_when_all_futures_have_complete_exceptionally() {
+ CompletableFuture<String> f1 = new CompletableFuture<>();
+ CompletableFuture<String> f2 = new CompletableFuture<>();
+ CompletableFuture<String> f3 = new CompletableFuture<>();
+ CompletableFuture<String> result = CompletableFutures.firstOf(List.of(f1, f2, f3));
+ f1.completeExceptionally(new Throwable("t1"));
+ f2.completeExceptionally(new Throwable("t2"));
+ f3.completeExceptionally(new Throwable("t3"));
+ assertTrue(result.isDone());
+ assertTrue(result.isCompletedExceptionally());
+ try {
+ result.join();
+ fail("Exception expected");
+ } catch (CompletionException e) {
+ Throwable cause = e.getCause();
+ assertEquals("t1", cause.getMessage());
+ assertEquals(2, cause.getSuppressed().length);
+ }
+ }
+
+} \ No newline at end of file