diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2021-06-09 13:31:12 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-09 13:31:12 +0200 |
commit | 9294174b953b2092264cc8a9484368bc2cce5b7c (patch) | |
tree | 8b7a23edee82ad274443d6618279804e6dd3ea1b /vespa-feed-client/src | |
parent | 07f850744c8fb94c5babd1175401d30e6f98c868 (diff) | |
parent | d86c5c5989fd63e8e98afaab2b7d4e32d2fb5c56 (diff) |
Merge pull request #18172 from vespa-engine/bjorncs/json-feeder
Bjorncs/json feeder
Diffstat (limited to 'vespa-feed-client/src')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java) | 313 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java (renamed from vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java) | 35 |
2 files changed, 211 insertions, 137 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 99d05a4bae8..2a6d2e15747 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -12,14 +12,18 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; @@ -30,17 +34,44 @@ import static java.util.Objects.requireNonNull; /** * @author jonmv + * @author bjorncs */ -public class JsonStreamFeeder implements Closeable { +public class JsonFeeder implements Closeable { + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); private final FeedClient client; private final OperationParameters protoParameters; - private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) { + private JsonFeeder(FeedClient client, OperationParameters protoParameters) { this.client = client; this.protoParameters = protoParameters; } + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + void onNextResult(Result result, Throwable error); + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + void onError(Throwable error); + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + void onComplete(); + } + public static Builder builder(FeedClient client) { return new Builder(client); } /** Feeds a stream containing a JSON array of feed operations on the form @@ -68,45 +99,59 @@ public class JsonStreamFeeder implements Closeable { * </pre> * Note that {@code "id"} is an alias for the document put operation. */ - public void feed(InputStream jsonStream) throws IOException { - feed(jsonStream, 1 << 26, false); + public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); } - BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feed(jsonStream, 1 << 26, true).get(); - } - - Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { + CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { RingBufferStream buffer = new RingBufferStream(jsonStream, size); - buffer.expect(JsonToken.START_ARRAY); - AtomicInteger okCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); - long startTime = System.nanoTime(); + CompletableFuture<Void> overallResult = new CompletableFuture<>(); CompletableFuture<Result> result; - AtomicReference<Throwable> thrown = new AtomicReference<>(); - while ((result = buffer.next()) != null) { - result.whenComplete((r, t) -> { - if (t != null) { - failedCount.incrementAndGet(); - if (!benchmark) thrown.set(t); - } else - okCount.incrementAndGet(); - }); - if (thrown.get() != null) - sneakyThrow(thrown.get()); + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenCompleteAsync((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultCallback.onNextResult(r, t); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultCallback.onComplete(); + overallResult.complete(null); + } + }, resultExecutor); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onError(e); + overallResult.completeExceptionally(e); + }); + } } - if (!benchmark) return Optional.empty(); - Duration duration = Duration.ofNanos(System.nanoTime() - startTime); - double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; - return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); + return overallResult; } - @SuppressWarnings("unchecked") - static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } - private static final JsonFactory factory = new JsonFactory(); - @Override public void close() throws IOException { client.close(); } + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } private class RingBufferStream extends InputStream { @@ -115,12 +160,12 @@ public class JsonStreamFeeder implements Closeable { private final byte[] data; private final int size; private final Object lock = new Object(); - private final JsonParser parser; private Throwable thrown = null; private long tail = 0; private long pos = 0; private long head = 0; private boolean done = false; + private final OperationParserAndExecutor parserAndExecutor; RingBufferStream(InputStream in, int size) { this.in = in; @@ -129,7 +174,7 @@ public class JsonStreamFeeder implements Closeable { new Thread(this::fill, "feed-reader").start(); - try { this.parser = factory.createParser(this); } + try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } catch (IOException e) { throw new UncheckedIOException(e); } } @@ -164,24 +209,104 @@ public class JsonStreamFeeder implements Closeable { } } - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + public CompletableFuture<Result> next() throws IOException { + return parserAndExecutor.next(); } - public CompletableFuture<Result> next() throws IOException { + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + + private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { + + RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); } + + @Override + String getDocumentJson(long start, long end) { + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + return payload; + } + } + } + + private abstract class OperationParserAndExecutor { + + private final JsonParser parser; + private final boolean multipleOperations; + + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { + this.parser = parser; + this.multipleOperations = multipleOperations; + if (multipleOperations) expect(START_ARRAY); + } + + abstract String getDocumentJson(long start, long end); + + CompletableFuture<Result> next() throws IOException { + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); long start = 0, end = -1; OperationType type = null; DocumentId id = null; OperationParameters parameters = protoParameters; - switch (parser.nextToken()) { - case END_ARRAY: return null; - case START_OBJECT: break; - default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - loop: while (true) { switch (parser.nextToken()) { case FIELD_NAME: @@ -204,7 +329,7 @@ public class JsonStreamFeeder implements Closeable { break; } default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } break; @@ -213,22 +338,15 @@ public class JsonStreamFeeder implements Closeable { default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } } - if (id == null) throw new IllegalArgumentException("No document id for document at offset " + start); if (end < start) throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - + String payload = getDocumentJson(start, end); switch (type) { case PUT: return client.put (id, payload, parameters); case UPDATE: return client.update(id, payload, parameters); @@ -237,27 +355,17 @@ public class JsonStreamFeeder implements Closeable { } } - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; } @@ -266,7 +374,7 @@ public class JsonStreamFeeder implements Closeable { Boolean value = parser.nextBooleanValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -276,44 +384,6 @@ public class JsonStreamFeeder implements Closeable { return DocumentId.of(readString()); } - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - } @@ -341,24 +411,9 @@ public class JsonStreamFeeder implements Closeable { return this; } - public JsonStreamFeeder build() { - return new JsonStreamFeeder(client, parameters); + public JsonFeeder build() { + return new JsonFeeder(client, parameters); } } - - static class BenchmarkResult { - final int okCount; - final int errorCount; - final Duration duration; - final double throughput; - - BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { - this.okCount = okCount; - this.errorCount = errorCount; - this.duration = duration; - this.throughput = throughput; - } - } - } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index 28a50b88396..0f14f9ab4be 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -5,16 +5,21 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; -class JsonStreamFeederTest { +class JsonFeederTest { @Test void test() throws IOException { @@ -38,32 +43,46 @@ class JsonStreamFeederTest { " }\n" + "]"; ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set<String> ids = new HashSet<>(); + Set<String> ids = new ConcurrentSkipListSet<>(); + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + AtomicReference<Throwable> exceptionThrow = new AtomicReference<>(); long startNanos = System.nanoTime(); - JsonStreamFeeder.builder(new FeedClient() { + JsonFeeder.builder(new FeedClient() { @Override public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { ids.add(documentId.userSpecific()); - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public void close(boolean graceful) { } - }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); + } + + }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { + @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } + @Override public void onError(Throwable error) { exceptionThrow.set(error); } + @Override public void onComplete() { completedSuccessfully.set(true); } + }).join(); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); } } |