From 50b5723757156e0b3eaaba2bafe130355b265f99 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 9 Jun 2021 11:34:09 +0200 Subject: Make feedMany async + introduce callback for each operation result --- .../main/java/ai/vespa/feed/client/JsonFeeder.java | 123 +++++++++++++-------- .../java/ai/vespa/feed/client/JsonFeederTest.java | 21 +++- 2 files changed, 95 insertions(+), 49 deletions(-) (limited to 'vespa-feed-client/src') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 48fcc71494f..6dff1b06fc0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -12,10 +12,12 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; @@ -32,9 +34,15 @@ import static java.util.Objects.requireNonNull; /** * @author jonmv + * @author bjorncs */ public class JsonFeeder implements Closeable { + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); private final FeedClient client; private final OperationParameters protoParameters; @@ -43,6 +51,27 @@ public class JsonFeeder implements Closeable { this.protoParameters = protoParameters; } + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + void onNextResult(Result result, Throwable error); + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + void onError(Throwable error); + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + void onComplete(); + } + public static Builder builder(FeedClient client) { return new Builder(client); } /** Feeds a stream containing a JSON array of feed operations on the form @@ -70,44 +99,61 @@ public class JsonFeeder implements Closeable { * * Note that {@code "id"} is an alias for the document put operation. */ - public void feedMany(InputStream jsonStream) throws IOException { - feedMany(jsonStream, 1 << 26, false); + public CompletableFuture feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); } - BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feedMany(jsonStream, 1 << 26, true).get(); - } - - Optional feedMany(InputStream jsonStream, int size, boolean benchmark) throws IOException { + CompletableFuture feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { RingBufferStream buffer = new RingBufferStream(jsonStream, size); - AtomicInteger okCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); - long startTime = System.nanoTime(); + CompletableFuture overallResult = new CompletableFuture<>(); CompletableFuture result; - AtomicReference thrown = new AtomicReference<>(); - while ((result = buffer.next()) != null) { - result.whenComplete((r, t) -> { - if (t != null) { - failedCount.incrementAndGet(); - if (!benchmark) thrown.set(t); - } else - okCount.incrementAndGet(); - }); - if (thrown.get() != null) - sneakyThrow(thrown.get()); + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenComplete((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultExecutor.execute(() -> resultCallback.onNextResult(r, t)); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + }); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onError(e); + overallResult.completeExceptionally(e); + }); + } } - if (!benchmark) return Optional.empty(); - Duration duration = Duration.ofNanos(System.nanoTime() - startTime); - double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; - return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); + return overallResult; } - @SuppressWarnings("unchecked") - static void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } - private static final JsonFactory factory = new JsonFactory(); - @Override public void close() throws IOException { client.close(); } + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } private class RingBufferStream extends InputStream { @@ -372,19 +418,4 @@ public class JsonFeeder implements Closeable { } } - - static class BenchmarkResult { - final int okCount; - final int errorCount; - final Duration duration; - final double throughput; - - BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { - this.okCount = okCount; - this.errorCount = errorCount; - this.duration = duration; - this.throughput = throughput; - } - } - } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index f50bd75bd4b..0f14f9ab4be 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -5,14 +5,19 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class JsonFeederTest { @@ -38,7 +43,10 @@ class JsonFeederTest { " }\n" + "]"; ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set ids = new HashSet<>(); + Set ids = new ConcurrentSkipListSet<>(); + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + AtomicReference exceptionThrow = new AtomicReference<>(); long startNanos = System.nanoTime(); JsonFeeder.builder(new FeedClient() { @@ -65,9 +73,16 @@ class JsonFeederTest { return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); } - }).build().feedMany(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { + @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } + @Override public void onError(Throwable error) { exceptionThrow.set(error); } + @Override public void onComplete() { completedSuccessfully.set(true); } + }).join(); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); } } -- cgit v1.2.3