From c2dde0aa56a1b20961db4e9b10a802a44b61b795 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 8 Jun 2021 17:05:21 +0200 Subject: Rename 'JsonStreamFeeder' to 'JsonFeeder' --- .../main/java/ai/vespa/feed/client/CliClient.java | 8 +- .../main/java/ai/vespa/feed/client/JsonFeeder.java | 364 +++++++++++++++++++++ .../ai/vespa/feed/client/JsonStreamFeeder.java | 364 --------------------- .../java/ai/vespa/feed/client/JsonFeederTest.java | 69 ++++ .../ai/vespa/feed/client/JsonStreamFeederTest.java | 69 ---- 5 files changed, 437 insertions(+), 437 deletions(-) create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java delete mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java create mode 100644 vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java delete mode 100644 vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index e3f726eaf11..5c98c986990 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -55,7 +55,7 @@ public class CliClient { return 0; } try (InputStream in = createFeedInputStream(cliArgs); - JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) { + JsonFeeder feeder = createJsonFeeder(cliArgs)) { if (cliArgs.benchmarkModeEnabled()) { printBenchmarkResult(feeder.benchmark(in)); } else { @@ -85,9 +85,9 @@ public class CliClient { return builder.build(); } - private static JsonStreamFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { + private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException { FeedClient feedClient = createFeedClient(cliArgs); - JsonStreamFeeder.Builder builder = JsonStreamFeeder.builder(feedClient); + JsonFeeder.Builder builder = JsonFeeder.builder(feedClient); cliArgs.timeout().ifPresent(builder::withTimeout); cliArgs.route().ifPresent(builder::withRoute); cliArgs.traceLevel().ifPresent(builder::withTracelevel); @@ -98,7 +98,7 @@ public class CliClient { return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get()); } - private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException { + private void printBenchmarkResult(JsonFeeder.BenchmarkResult result) throws IOException { JsonFactory factory = new JsonFactory(); try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) { generator.writeStartObject(); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java new file mode 100644 index 00000000000..a4d75a0a7c7 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -0,0 +1,364 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + */ +public class JsonFeeder implements Closeable { + + private final FeedClient client; + private final OperationParameters protoParameters; + + private JsonFeeder(FeedClient client, OperationParameters protoParameters) { + this.client = client; + this.protoParameters = protoParameters; + } + + public static Builder builder(FeedClient client) { return new Builder(client); } + + /** Feeds a stream containing a JSON array of feed operations on the form + *
+     *     [
+     *       {
+     *         "id": "id:ns:type::boo",
+     *         "fields": { ... document fields ... }
+     *       },
+     *       {
+     *         "put": "id:ns:type::foo",
+     *         "fields": { ... document fields ... }
+     *       },
+     *       {
+     *         "update": "id:ns:type:n=4:bar",
+     *         "create": true,
+     *         "fields": { ... partial update fields ... }
+     *       },
+     *       {
+     *         "remove": "id:ns:type:g=foo:bar",
+     *         "condition": "type.baz = \"bax\""
+     *       },
+     *       ...
+     *     ]
+     * 
+ * Note that {@code "id"} is an alias for the document put operation. + */ + public void feed(InputStream jsonStream) throws IOException { + feed(jsonStream, 1 << 26, false); + } + + BenchmarkResult benchmark(InputStream jsonStream) throws IOException { + return feed(jsonStream, 1 << 26, true).get(); + } + + Optional feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + buffer.expect(JsonToken.START_ARRAY); + AtomicInteger okCount = new AtomicInteger(); + AtomicInteger failedCount = new AtomicInteger(); + long startTime = System.nanoTime(); + CompletableFuture result; + AtomicReference thrown = new AtomicReference<>(); + while ((result = buffer.next()) != null) { + result.whenComplete((r, t) -> { + if (t != null) { + failedCount.incrementAndGet(); + if (!benchmark) thrown.set(t); + } else + okCount.incrementAndGet(); + }); + if (thrown.get() != null) + sneakyThrow(thrown.get()); + } + if (!benchmark) return Optional.empty(); + Duration duration = Duration.ofNanos(System.nanoTime() - startTime); + double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; + return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); + } + + @SuppressWarnings("unchecked") + static void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } + + private static final JsonFactory factory = new JsonFactory(); + + @Override public void close() throws IOException { client.close(); } + + private class RingBufferStream extends InputStream { + + private final byte[] b = new byte[1]; + private final InputStream in; + private final byte[] data; + private final int size; + private final Object lock = new Object(); + private final JsonParser parser; + private Throwable thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; + + RingBufferStream(InputStream in, int size) { + this.in = in; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + try { this.parser = factory.createParser(this); } + catch (IOException e) { throw new UncheckedIOException(e); } + } + + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw new RuntimeException("Error reading input", thrown); + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + public CompletableFuture next() throws IOException { + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + switch (parser.nextToken()) { + case END_ARRAY: return null; + case START_OBJECT: break; + default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + break; + + case END_OBJECT: + break loop; + + default: + throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + } + + if (id == null) + throw new IllegalArgumentException("No document id for document at offset " + start); + + if (end < start) + throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); + } + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + + } + + + public static class Builder { + + final FeedClient client; + OperationParameters parameters = OperationParameters.empty(); + + private Builder(FeedClient client) { + this.client = requireNonNull(client); + } + + public Builder withTimeout(Duration timeout) { + parameters = parameters.timeout(timeout); + return this; + } + + public Builder withRoute(String route) { + parameters = parameters.route(route); + return this; + } + + public Builder withTracelevel(int tracelevel) { + parameters = parameters.tracelevel(tracelevel); + return this; + } + + public JsonFeeder build() { + return new JsonFeeder(client, parameters); + } + + } + + static class BenchmarkResult { + final int okCount; + final int errorCount; + final Duration duration; + final double throughput; + + BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { + this.okCount = okCount; + this.errorCount = errorCount; + this.duration = duration; + this.throughput = throughput; + } + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java deleted file mode 100644 index 99d05a4bae8..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ /dev/null @@ -1,364 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static ai.vespa.feed.client.FeedClient.OperationType.PUT; -import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; -import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - */ -public class JsonStreamFeeder implements Closeable { - - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds a stream containing a JSON array of feed operations on the form - *
-     *     [
-     *       {
-     *         "id": "id:ns:type::boo",
-     *         "fields": { ... document fields ... }
-     *       },
-     *       {
-     *         "put": "id:ns:type::foo",
-     *         "fields": { ... document fields ... }
-     *       },
-     *       {
-     *         "update": "id:ns:type:n=4:bar",
-     *         "create": true,
-     *         "fields": { ... partial update fields ... }
-     *       },
-     *       {
-     *         "remove": "id:ns:type:g=foo:bar",
-     *         "condition": "type.baz = \"bax\""
-     *       },
-     *       ...
-     *     ]
-     * 
- * Note that {@code "id"} is an alias for the document put operation. - */ - public void feed(InputStream jsonStream) throws IOException { - feed(jsonStream, 1 << 26, false); - } - - BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feed(jsonStream, 1 << 26, true).get(); - } - - Optional feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - buffer.expect(JsonToken.START_ARRAY); - AtomicInteger okCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); - long startTime = System.nanoTime(); - CompletableFuture result; - AtomicReference thrown = new AtomicReference<>(); - while ((result = buffer.next()) != null) { - result.whenComplete((r, t) -> { - if (t != null) { - failedCount.incrementAndGet(); - if (!benchmark) thrown.set(t); - } else - okCount.incrementAndGet(); - }); - if (thrown.get() != null) - sneakyThrow(thrown.get()); - } - if (!benchmark) return Optional.empty(); - Duration duration = Duration.ofNanos(System.nanoTime() - startTime); - double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; - return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); - } - - @SuppressWarnings("unchecked") - static void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { client.close(); } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final byte[] data; - private final int size; - private final Object lock = new Object(); - private final JsonParser parser; - private Throwable thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - - RingBufferStream(InputStream in, int size) { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - try { this.parser = factory.createParser(this); } - catch (IOException e) { throw new UncheckedIOException(e); } - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw new RuntimeException("Error reading input", thrown); - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - public CompletableFuture next() throws IOException { - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - switch (parser.nextToken()) { - case END_ARRAY: return null; - case START_OBJECT: break; - default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - break; - - case END_OBJECT: - break loop; - - default: - throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - } - - if (id == null) - throw new IllegalArgumentException("No document id for document at offset " + start); - - if (end < start) - throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); - } - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - - } - - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonStreamFeeder build() { - return new JsonStreamFeeder(client, parameters); - } - - } - - static class BenchmarkResult { - final int okCount; - final int errorCount; - final Duration duration; - final double throughput; - - BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { - this.okCount = okCount; - this.errorCount = errorCount; - this.duration = duration; - this.throughput = throughput; - } - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java new file mode 100644 index 00000000000..0f360fd94c7 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -0,0 +1,69 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class JsonFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 14; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); + Set ids = new HashSet<>(); + long startNanos = System.nanoTime(); + JsonFeeder.builder(new FeedClient() { + + @Override + public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture remove(DocumentId documentId, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public void close(boolean graceful) { } + + }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, ids.size()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java deleted file mode 100644 index 28a50b88396..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.joining; -import static org.junit.jupiter.api.Assertions.assertEquals; - -class JsonStreamFeederTest { - - @Test - void test() throws IOException { - int docs = 1 << 14; - String json = "[\n" + - - IntStream.range(0, docs).mapToObj(i -> - " {\n" + - " \"id\": \"id:ns:type::abc" + i + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " },\n" - ).collect(joining()) + - - " {\n" + - " \"id\": \"id:ns:type::abc" + docs + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " }\n" + - "]"; - ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set ids = new HashSet<>(); - long startNanos = System.nanoTime(); - JsonStreamFeeder.builder(new FeedClient() { - - @Override - public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { - ids.add(documentId.userSpecific()); - return new CompletableFuture<>(); - } - - @Override - public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { - return new CompletableFuture<>(); - } - - @Override - public CompletableFuture remove(DocumentId documentId, OperationParameters params) { - return new CompletableFuture<>(); - } - - @Override - public void close(boolean graceful) { } - - }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document - System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - assertEquals(docs + 1, ids.size()); - } - -} -- cgit v1.2.3 From fe640886026882b7e44de43ed39deaec7f8636b7 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 8 Jun 2021 17:07:08 +0200 Subject: Rename 'feed' to 'feedMany' --- .../src/main/java/ai/vespa/feed/client/CliClient.java | 2 +- .../src/main/java/ai/vespa/feed/client/JsonFeeder.java | 8 ++++---- .../src/test/java/ai/vespa/feed/client/JsonFeederTest.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 5c98c986990..40c6ac56022 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -59,7 +59,7 @@ public class CliClient { if (cliArgs.benchmarkModeEnabled()) { printBenchmarkResult(feeder.benchmark(in)); } else { - feeder.feed(in); + feeder.feedMany(in); } } return 0; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index a4d75a0a7c7..0990e0edf60 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -68,15 +68,15 @@ public class JsonFeeder implements Closeable { * * Note that {@code "id"} is an alias for the document put operation. */ - public void feed(InputStream jsonStream) throws IOException { - feed(jsonStream, 1 << 26, false); + public void feedMany(InputStream jsonStream) throws IOException { + feedMany(jsonStream, 1 << 26, false); } BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feed(jsonStream, 1 << 26, true).get(); + return feedMany(jsonStream, 1 << 26, true).get(); } - Optional feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { + Optional feedMany(InputStream jsonStream, int size, boolean benchmark) throws IOException { RingBufferStream buffer = new RingBufferStream(jsonStream, size); buffer.expect(JsonToken.START_ARRAY); AtomicInteger okCount = new AtomicInteger(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index 0f360fd94c7..60f6b3a4e73 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -61,7 +61,7 @@ class JsonFeederTest { @Override public void close(boolean graceful) { } - }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + }).build().feedMany(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); } -- cgit v1.2.3 From 9d3a6c0a90d11e7cabf889c16c6c7a025ce39712 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Tue, 8 Jun 2021 17:51:07 +0200 Subject: Split out parser and dispatch of operations to separate class --- .../main/java/ai/vespa/feed/client/JsonFeeder.java | 184 ++++++++++++--------- 1 file changed, 105 insertions(+), 79 deletions(-) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 0990e0edf60..48fcc71494f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -20,6 +20,8 @@ import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; @@ -78,7 +80,6 @@ public class JsonFeeder implements Closeable { Optional feedMany(InputStream jsonStream, int size, boolean benchmark) throws IOException { RingBufferStream buffer = new RingBufferStream(jsonStream, size); - buffer.expect(JsonToken.START_ARRAY); AtomicInteger okCount = new AtomicInteger(); AtomicInteger failedCount = new AtomicInteger(); long startTime = System.nanoTime(); @@ -115,12 +116,12 @@ public class JsonFeeder implements Closeable { private final byte[] data; private final int size; private final Object lock = new Object(); - private final JsonParser parser; private Throwable thrown = null; private long tail = 0; private long pos = 0; private long head = 0; private boolean done = false; + private final OperationParserAndExecutor parserAndExecutor; RingBufferStream(InputStream in, int size) { this.in = in; @@ -129,7 +130,7 @@ public class JsonFeeder implements Closeable { new Thread(this::fill, "feed-reader").start(); - try { this.parser = factory.createParser(this); } + try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } catch (IOException e) { throw new UncheckedIOException(e); } } @@ -164,24 +165,104 @@ public class JsonFeeder implements Closeable { } } - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + public CompletableFuture next() throws IOException { + return parserAndExecutor.next(); } - public CompletableFuture next() throws IOException { + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + + private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { + + RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); } + + @Override + String getDocumentJson(long start, long end) { + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + return payload; + } + } + } + + private abstract class OperationParserAndExecutor { + + private final JsonParser parser; + private final boolean multipleOperations; + + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { + this.parser = parser; + this.multipleOperations = multipleOperations; + if (multipleOperations) expect(START_ARRAY); + } + + abstract String getDocumentJson(long start, long end); + + CompletableFuture next() throws IOException { + JsonToken token = parser.nextToken(); + if (token == END_ARRAY && multipleOperations) return null; + else if (token == null && !multipleOperations) return null; + else if (token == START_OBJECT); + else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); long start = 0, end = -1; OperationType type = null; DocumentId id = null; OperationParameters parameters = protoParameters; - switch (parser.nextToken()) { - case END_ARRAY: return null; - case START_OBJECT: break; - default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - loop: while (true) { switch (parser.nextToken()) { case FIELD_NAME: @@ -204,7 +285,7 @@ public class JsonFeeder implements Closeable { break; } default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } break; @@ -213,22 +294,15 @@ public class JsonFeeder implements Closeable { default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); + parser.getTokenLocation().getByteOffset()); } } - if (id == null) throw new IllegalArgumentException("No document id for document at offset " + start); if (end < start) throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - + String payload = getDocumentJson(start, end); switch (type) { case PUT: return client.put (id, payload, parameters); case UPDATE: return client.update(id, payload, parameters); @@ -237,27 +311,17 @@ public class JsonFeeder implements Closeable { } } - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); } private String readString() throws IOException { String value = parser.nextTextValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; } @@ -266,7 +330,7 @@ public class JsonFeeder implements Closeable { Boolean value = parser.nextBooleanValue(); if (value == null) throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); return value; @@ -276,44 +340,6 @@ public class JsonFeeder implements Closeable { return DocumentId.of(readString()); } - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - } -- cgit v1.2.3 From d29f603291096e0df578fca57a23c603124ddd03 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 9 Jun 2021 11:24:02 +0200 Subject: Make client mock mark async operations as completed --- .../src/test/java/ai/vespa/feed/client/JsonFeederTest.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index 60f6b3a4e73..f50bd75bd4b 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -45,22 +45,26 @@ class JsonFeederTest { @Override public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { ids.add(documentId.userSpecific()); - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public CompletableFuture remove(DocumentId documentId, OperationParameters params) { - return new CompletableFuture<>(); + return createSuccessResult(documentId); } @Override public void close(boolean graceful) { } + private CompletableFuture createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); + } + }).build().feedMany(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); -- cgit v1.2.3 From 50b5723757156e0b3eaaba2bafe130355b265f99 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 9 Jun 2021 11:34:09 +0200 Subject: Make feedMany async + introduce callback for each operation result --- .../main/java/ai/vespa/feed/client/JsonFeeder.java | 123 +++++++++++++-------- .../java/ai/vespa/feed/client/JsonFeederTest.java | 21 +++- 2 files changed, 95 insertions(+), 49 deletions(-) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 48fcc71494f..6dff1b06fc0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -12,10 +12,12 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.UncheckedIOException; import java.time.Duration; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static ai.vespa.feed.client.FeedClient.OperationType.PUT; import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; @@ -32,9 +34,15 @@ import static java.util.Objects.requireNonNull; /** * @author jonmv + * @author bjorncs */ public class JsonFeeder implements Closeable { + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); private final FeedClient client; private final OperationParameters protoParameters; @@ -43,6 +51,27 @@ public class JsonFeeder implements Closeable { this.protoParameters = protoParameters; } + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + void onNextResult(Result result, Throwable error); + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + void onError(Throwable error); + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + void onComplete(); + } + public static Builder builder(FeedClient client) { return new Builder(client); } /** Feeds a stream containing a JSON array of feed operations on the form @@ -70,44 +99,61 @@ public class JsonFeeder implements Closeable { * * Note that {@code "id"} is an alias for the document put operation. */ - public void feedMany(InputStream jsonStream) throws IOException { - feedMany(jsonStream, 1 << 26, false); + public CompletableFuture feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); } - BenchmarkResult benchmark(InputStream jsonStream) throws IOException { - return feedMany(jsonStream, 1 << 26, true).get(); - } - - Optional feedMany(InputStream jsonStream, int size, boolean benchmark) throws IOException { + CompletableFuture feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { RingBufferStream buffer = new RingBufferStream(jsonStream, size); - AtomicInteger okCount = new AtomicInteger(); - AtomicInteger failedCount = new AtomicInteger(); - long startTime = System.nanoTime(); + CompletableFuture overallResult = new CompletableFuture<>(); CompletableFuture result; - AtomicReference thrown = new AtomicReference<>(); - while ((result = buffer.next()) != null) { - result.whenComplete((r, t) -> { - if (t != null) { - failedCount.incrementAndGet(); - if (!benchmark) thrown.set(t); - } else - okCount.incrementAndGet(); - }); - if (thrown.get() != null) - sneakyThrow(thrown.get()); + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenComplete((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultExecutor.execute(() -> resultCallback.onNextResult(r, t)); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + }); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onError(e); + overallResult.completeExceptionally(e); + }); + } } - if (!benchmark) return Optional.empty(); - Duration duration = Duration.ofNanos(System.nanoTime() - startTime); - double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; - return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); + return overallResult; } - @SuppressWarnings("unchecked") - static void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } - private static final JsonFactory factory = new JsonFactory(); - @Override public void close() throws IOException { client.close(); } + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } private class RingBufferStream extends InputStream { @@ -372,19 +418,4 @@ public class JsonFeeder implements Closeable { } } - - static class BenchmarkResult { - final int okCount; - final int errorCount; - final Duration duration; - final double throughput; - - BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { - this.okCount = okCount; - this.errorCount = errorCount; - this.duration = duration; - this.throughput = throughput; - } - } - } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java index f50bd75bd4b..0f14f9ab4be 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -5,14 +5,19 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.joining; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; class JsonFeederTest { @@ -38,7 +43,10 @@ class JsonFeederTest { " }\n" + "]"; ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set ids = new HashSet<>(); + Set ids = new ConcurrentSkipListSet<>(); + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + AtomicReference exceptionThrow = new AtomicReference<>(); long startNanos = System.nanoTime(); JsonFeeder.builder(new FeedClient() { @@ -65,9 +73,16 @@ class JsonFeederTest { return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); } - }).build().feedMany(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { + @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } + @Override public void onError(Throwable error) { exceptionThrow.set(error); } + @Override public void onComplete() { completedSuccessfully.set(true); } + }).join(); // TODO: hangs when buffer is smaller than largest document System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); assertEquals(docs + 1, ids.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); } } -- cgit v1.2.3 From d86c5c5989fd63e8e98afaab2b7d4e32d2fb5c56 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 9 Jun 2021 13:29:07 +0200 Subject: Dispatch all per-request processing to result executor --- .../src/main/java/ai/vespa/feed/client/JsonFeeder.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java index 6dff1b06fc0..2a6d2e15747 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -112,17 +112,15 @@ public class JsonFeeder implements Closeable { try { while ((result = buffer.next()) != null) { pending.incrementAndGet(); - result.whenComplete((r, t) -> { + result.whenCompleteAsync((r, t) -> { if (!finalCallbackInvoked.get()) { - resultExecutor.execute(() -> resultCallback.onNextResult(r, t)); + resultCallback.onNextResult(r, t); } if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); + resultCallback.onComplete(); + overallResult.complete(null); } - }); + }, resultExecutor); } if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { resultExecutor.execute(() -> { -- cgit v1.2.3