diff options
author | Bjørn Christian Seime <bjorn.christian@seime.no> | 2021-06-09 13:50:23 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-09 13:50:23 +0200 |
commit | e68933c235f1a43c5f8542c6e41838dbf2e9cd27 (patch) | |
tree | 7167024f0ea9fbdccf4f16d9e6740dfbed915621 | |
parent | 9294174b953b2092264cc8a9484368bc2cce5b7c (diff) |
Revert "Bjorncs/json feeder"
-rw-r--r-- | vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java | 10 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java) | 313 | ||||
-rw-r--r-- | vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java (renamed from vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java) | 35 |
3 files changed, 142 insertions, 216 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 40c6ac56022..e3f726eaf11 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -55,11 +55,11 @@ public class CliClient { return 0; } try (InputStream in = createFeedInputStream(cliArgs); - JsonFeeder feeder = createJsonFeeder(cliArgs)) { + JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) { if (cliArgs.benchmarkModeEnabled()) { printBenchmarkResult(feeder.benchmark(in)); } else { - feeder.feedMany(in); + feeder.feed(in); } } return 0; @@ -85,9 +85,9 @@ public class CliClient { return builder.build(); } - private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { + private static JsonStreamFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { FeedClient feedClient = createFeedClient(cliArgs); - JsonFeeder.Builder builder = JsonFeeder.builder(feedClient); + JsonStreamFeeder.Builder builder = JsonStreamFeeder.builder(feedClient); cliArgs.timeout().ifPresent(builder::withTimeout); cliArgs.route().ifPresent(builder::withRoute); cliArgs.traceLevel().ifPresent(builder::withTracelevel); @@ -98,7 +98,7 @@ public class CliClient { return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get()); } - private void printBenchmarkResult(JsonFeeder.BenchmarkResult result) throws IOException { + private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException { JsonFactory factory = new JsonFactory(); try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { generator.writeStartObject(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java index 2a6d2e15747..99d05a4bae8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java @@ -12,18 +12,14 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; @@ -34,44 +30,17 @@ import static java.util.Objects.requireNonNull; /** * @author jonmv - * @author bjorncs */ -public class JsonFeeder implements Closeable { +public class JsonStreamFeeder implements Closeable { - private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "json-feeder-result-executor"); - t.setDaemon(true); - return t; - }); private final FeedClient client; private final OperationParameters protoParameters; - private JsonFeeder(FeedClient client, OperationParameters protoParameters) { + private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) { this.client = client; this.protoParameters = protoParameters; } - public interface ResultCallback { - /** - * Invoked after each operation has either completed successfully or failed - * - * @param result Non-null if operation completed successfully - * @param error Non-null if operation failed - */ - void onNextResult(Result result, Throwable error); - - /** - * Invoked if an unrecoverable error occurred during feed processing, - * after which no other {@link ResultCallback} methods are invoked. - */ - void onError(Throwable error); - - /** - * Invoked when all feed operations are either completed successfully or failed. - */ - void onComplete(); - } - public static Builder builder(FeedClient client) { return new Builder(client); } /** Feeds a stream containing a JSON array of feed operations on the form @@ -99,59 +68,45 @@ public class JsonFeeder implements Closeable { * </pre> * Note that {@code "id"} is an alias for the document put operation. */ - public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { - return feedMany(jsonStream, 1 << 26, resultCallback); + public void feed(InputStream jsonStream) throws IOException { + feed(jsonStream, 1 << 26, false); } - CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { + BenchmarkResult benchmark(InputStream jsonStream) throws IOException { + return feed(jsonStream, 1 << 26, true).get(); + } + + Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { RingBufferStream buffer = new RingBufferStream(jsonStream, size); - CompletableFuture<Void> overallResult = new CompletableFuture<>(); + buffer.expect(JsonToken.START_ARRAY); + AtomicInteger okCount = new AtomicInteger(); + AtomicInteger failedCount = new AtomicInteger(); + long startTime = System.nanoTime(); CompletableFuture<Result> result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation - AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - while ((result = buffer.next()) != null) { - pending.incrementAndGet(); - result.whenCompleteAsync((r, t) -> { - if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, t); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); - overallResult.complete(null); - } - }, resultExecutor); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); - } - } catch (Exception e) { - if (finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onError(e); - overallResult.completeExceptionally(e); - }); - } + AtomicReference<Throwable> thrown = new AtomicReference<>(); + while ((result = buffer.next()) != null) { + result.whenComplete((r, t) -> { + if (t != null) { + failedCount.incrementAndGet(); + if (!benchmark) thrown.set(t); + } else + okCount.incrementAndGet(); + }); + if (thrown.get() != null) + sneakyThrow(thrown.get()); } - return overallResult; + if (!benchmark) return Optional.empty(); + Duration duration = Duration.ofNanos(System.nanoTime() - startTime); + double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; + return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); } + @SuppressWarnings("unchecked") + static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } + private static final JsonFactory factory = new JsonFactory(); - @Override public void close() throws IOException { - client.close(); - resultExecutor.shutdown(); - try { - if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IOException("Failed to close client in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + @Override public void close() throws IOException { client.close(); } private class RingBufferStream extends InputStream { @@ -160,12 +115,12 @@ public class JsonFeeder implements Closeable { private final byte[] data; private final int size; private final Object lock = new Object(); + private final JsonParser parser; private Throwable thrown = null; private long tail = 0; private long pos = 0; private long head = 0; private boolean done = false; - private final OperationParserAndExecutor parserAndExecutor; RingBufferStream(InputStream in, int size) { this.in = in; @@ -174,7 +129,7 @@ public class JsonFeeder implements Closeable { new Thread(this::fill, "feed-reader").start(); - try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } + try { this.parser = factory.createParser(this); } catch (IOException e) { throw new UncheckedIOException(e); } } @@ -209,104 +164,24 @@ public class JsonFeeder implements Closeable { } } - public CompletableFuture<Result> next() throws IOException { - return parserAndExecutor.next(); - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - - private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - - RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); } - - @Override - String getDocumentJson(long start, long end) { - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - return payload; - } - } - } - - private abstract class OperationParserAndExecutor { - - private final JsonParser parser; - private final boolean multipleOperations; - - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { - this.parser = parser; - this.multipleOperations = multipleOperations; - if (multipleOperations) expect(START_ARRAY); + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } - abstract String getDocumentJson(long start, long end); - - CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && !multipleOperations) return null; - else if (token == START_OBJECT); - else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); + public CompletableFuture<Result> next() throws IOException { long start = 0, end = -1; OperationType type = null; DocumentId id = null; OperationParameters parameters = protoParameters; + switch (parser.nextToken()) { + case END_ARRAY: return null; + case START_OBJECT: break; + default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + loop: while (true) { switch (parser.nextToken()) { case FIELD_NAME: @@ -329,7 +204,7 @@ public class JsonFeeder implements Closeable { break; } default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } break; @@ -338,15 +213,22 @@ public class JsonFeeder implements Closeable { default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } } + if (id == null) throw new IllegalArgumentException("No document id for document at offset " + start); if (end < start) throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - String payload = getDocumentJson(start, end); + + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + switch (type) { case PUT: return client.put (id, payload, parameters); case UPDATE: return client.update(id, payload, parameters); @@ -355,17 +237,27 @@ public class JsonFeeder implements Closeable { } } - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; } @@ -374,7 +266,7 @@ public class JsonFeeder implements Closeable { Boolean value = parser.nextBooleanValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -384,6 +276,44 @@ public class JsonFeeder implements Closeable { return DocumentId.of(readString()); } + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + } @@ -411,9 +341,24 @@ public class JsonFeeder implements Closeable { return this; } - public JsonFeeder build() { - return new JsonFeeder(client, parameters); + public JsonStreamFeeder build() { + return new JsonStreamFeeder(client, parameters); } } + + static class BenchmarkResult { + final int okCount; + final int errorCount; + final Duration duration; + final double throughput; + + BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { + this.okCount = okCount; + this.errorCount = errorCount; + this.duration = duration; + this.throughput = throughput; + } + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java index 0f14f9ab4be..28a50b88396 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -5,21 +5,16 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -class JsonFeederTest { +class JsonStreamFeederTest { @Test void test() throws IOException { @@ -43,46 +38,32 @@ class JsonFeederTest { " }\n" + "]"; ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set<String> ids = new ConcurrentSkipListSet<>(); - AtomicInteger resultsReceived = new AtomicInteger(); - AtomicBoolean completedSuccessfully = new AtomicBoolean(); - AtomicReference<Throwable> exceptionThrow = new AtomicReference<>(); + Set<String> ids = new HashSet<>(); long startNanos = System.nanoTime(); - JsonFeeder.builder(new FeedClient() { + JsonStreamFeeder.builder(new FeedClient() { @Override public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { ids.add(documentId.userSpecific()); - return createSuccessResult(documentId); + return new CompletableFuture<>(); } @Override public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); + return new CompletableFuture<>(); } @Override public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); + return new CompletableFuture<>(); } @Override public void close(boolean graceful) { } - private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); - } - - }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { - @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } - @Override public void onError(Throwable error) { exceptionThrow.set(error); } - @Override public void onComplete() { completedSuccessfully.set(true); } - }).join(); // TODO: hangs when buffer is smaller than largest document + }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); - assertEquals(docs + 1, resultsReceived.get()); - assertTrue(completedSuccessfully.get()); - assertNull(exceptionThrow.get()); } } |