aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-09 11:34:09 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-09 11:34:09 +0200
commit50b5723757156e0b3eaaba2bafe130355b265f99 (patch)
treed5437daaff2cb2332b73dd2f1baa8d7a722445e4 /vespa-feed-client
parentd29f603291096e0df578fca57a23c603124ddd03 (diff)
Make feedMany async + introduce callback for each operation result
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java123
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java21
2 files changed, 95 insertions, 49 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 48fcc71494f..6dff1b06fc0 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -12,10 +12,12 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
@@ -32,9 +34,15 @@ import static java.util.Objects.requireNonNull;
/**
* @author jonmv
+ * @author bjorncs
*/
public class JsonFeeder implements Closeable {
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "json-feeder-result-executor");
+ t.setDaemon(true);
+ return t;
+ });
private final FeedClient client;
private final OperationParameters protoParameters;
@@ -43,6 +51,27 @@ public class JsonFeeder implements Closeable {
this.protoParameters = protoParameters;
}
+ public interface ResultCallback {
+ /**
+ * Invoked after each operation has either completed successfully or failed
+ *
+ * @param result Non-null if operation completed successfully
+ * @param error Non-null if operation failed
+ */
+ void onNextResult(Result result, Throwable error);
+
+ /**
+ * Invoked if an unrecoverable error occurred during feed processing,
+ * after which no other {@link ResultCallback} methods are invoked.
+ */
+ void onError(Throwable error);
+
+ /**
+ * Invoked when all feed operations are either completed successfully or failed.
+ */
+ void onComplete();
+ }
+
public static Builder builder(FeedClient client) { return new Builder(client); }
/** Feeds a stream containing a JSON array of feed operations on the form
@@ -70,44 +99,61 @@ public class JsonFeeder implements Closeable {
* </pre>
* Note that {@code "id"} is an alias for the document put operation.
*/
- public void feedMany(InputStream jsonStream) throws IOException {
- feedMany(jsonStream, 1 << 26, false);
+ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
+ return feedMany(jsonStream, 1 << 26, resultCallback);
}
- BenchmarkResult benchmark(InputStream jsonStream) throws IOException {
- return feedMany(jsonStream, 1 << 26, true).get();
- }
-
- Optional<BenchmarkResult> feedMany(InputStream jsonStream, int size, boolean benchmark) throws IOException {
+ CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
- AtomicInteger okCount = new AtomicInteger();
- AtomicInteger failedCount = new AtomicInteger();
- long startTime = System.nanoTime();
+ CompletableFuture<Void> overallResult = new CompletableFuture<>();
CompletableFuture<Result> result;
- AtomicReference<Throwable> thrown = new AtomicReference<>();
- while ((result = buffer.next()) != null) {
- result.whenComplete((r, t) -> {
- if (t != null) {
- failedCount.incrementAndGet();
- if (!benchmark) thrown.set(t);
- } else
- okCount.incrementAndGet();
- });
- if (thrown.get() != null)
- sneakyThrow(thrown.get());
+ AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
+ try {
+ while ((result = buffer.next()) != null) {
+ pending.incrementAndGet();
+ result.whenComplete((r, t) -> {
+ if (!finalCallbackInvoked.get()) {
+ resultExecutor.execute(() -> resultCallback.onNextResult(r, t));
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ });
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ } catch (Exception e) {
+ if (finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onError(e);
+ overallResult.completeExceptionally(e);
+ });
+ }
}
- if (!benchmark) return Optional.empty();
- Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
- double throughPut = (double)okCount.get() / duration.toMillis() * 1000D;
- return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut));
+ return overallResult;
}
- @SuppressWarnings("unchecked")
- static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; }
-
private static final JsonFactory factory = new JsonFactory();
- @Override public void close() throws IOException { client.close(); }
+ @Override public void close() throws IOException {
+ client.close();
+ resultExecutor.shutdown();
+ try {
+ if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ throw new IOException("Failed to close client in time");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
private class RingBufferStream extends InputStream {
@@ -372,19 +418,4 @@ public class JsonFeeder implements Closeable {
}
}
-
- static class BenchmarkResult {
- final int okCount;
- final int errorCount;
- final Duration duration;
- final double throughput;
-
- BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) {
- this.okCount = okCount;
- this.errorCount = errorCount;
- this.duration = duration;
- this.throughput = throughput;
- }
- }
-
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index f50bd75bd4b..0f14f9ab4be 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -5,14 +5,19 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class JsonFeederTest {
@@ -38,7 +43,10 @@ class JsonFeederTest {
" }\n" +
"]";
ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8));
- Set<String> ids = new HashSet<>();
+ Set<String> ids = new ConcurrentSkipListSet<>();
+ AtomicInteger resultsReceived = new AtomicInteger();
+ AtomicBoolean completedSuccessfully = new AtomicBoolean();
+ AtomicReference<Throwable> exceptionThrow = new AtomicReference<>();
long startNanos = System.nanoTime();
JsonFeeder.builder(new FeedClient() {
@@ -65,9 +73,16 @@ class JsonFeederTest {
return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
}
- }).build().feedMany(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document
+ }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() {
+ @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
+ @Override public void onError(Throwable error) { exceptionThrow.set(error); }
+ @Override public void onComplete() { completedSuccessfully.set(true); }
+ }).join(); // TODO: hangs when buffer is smaller than largest document
System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
assertEquals(docs + 1, ids.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
}
}