From e68933c235f1a43c5f8542c6e41838dbf2e9cd27 Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 9 Jun 2021 13:50:23 +0200 Subject: Revert "Bjorncs/json feeder" --- .../main/java/ai/vespa/feed/client/JsonFeeder.java | 419 --------------------- .../ai/vespa/feed/client/JsonStreamFeeder.java | 364 ++++++++++++++++++ .../java/ai/vespa/feed/client/JsonFeederTest.java | 88 ----- .../ai/vespa/feed/client/JsonStreamFeederTest.java | 69 ++++ 4 files changed, 433 insertions(+), 507 deletions(-) delete mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java delete mode 100644 vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java create mode 100644 vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java (limited to 'vespa-feed-client') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java deleted file mode 100644 index 2a6d2e15747..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ /dev/null @@ -1,419 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.UncheckedIOException; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static ai.vespa.feed.client.FeedClient.OperationType.PUT; -import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; -import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - * @author bjorncs - */ -public class JsonFeeder implements Closeable { - - private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "json-feeder-result-executor"); - t.setDaemon(true); - return t; - }); - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public interface ResultCallback { - /** - * Invoked after each operation has either completed successfully or failed - * - * @param result Non-null if operation completed successfully - * @param error Non-null if operation failed - */ - void onNextResult(Result result, Throwable error); - - /** - * Invoked if an unrecoverable error occurred during feed processing, - * after which no other {@link ResultCallback} methods are invoked. - */ - void onError(Throwable error); - - /** - * Invoked when all feed operations are either completed successfully or failed. - */ - void onComplete(); - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds a stream containing a JSON array of feed operations on the form - *
-     *     [
-     *       {
-     *         "id": "id:ns:type::boo",
-     *         "fields": { ... document fields ... }
-     *       },
-     *       {
-     *         "put": "id:ns:type::foo",
-     *         "fields": { ... document fields ... }
-     *       },
-     *       {
-     *         "update": "id:ns:type:n=4:bar",
-     *         "create": true,
-     *         "fields": { ... partial update fields ... }
-     *       },
-     *       {
-     *         "remove": "id:ns:type:g=foo:bar",
-     *         "condition": "type.baz = \"bax\""
-     *       },
-     *       ...
-     *     ]
-     * 
- * Note that {@code "id"} is an alias for the document put operation. - */ - public CompletableFuture feedMany(InputStream jsonStream, ResultCallback resultCallback) { - return feedMany(jsonStream, 1 << 26, resultCallback); - } - - CompletableFuture feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - CompletableFuture overallResult = new CompletableFuture<>(); - CompletableFuture result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation - AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - while ((result = buffer.next()) != null) { - pending.incrementAndGet(); - result.whenCompleteAsync((r, t) -> { - if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, t); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); - overallResult.complete(null); - } - }, resultExecutor); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); - } - } catch (Exception e) { - if (finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onError(e); - overallResult.completeExceptionally(e); - }); - } - } - return overallResult; - } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { - client.close(); - resultExecutor.shutdown(); - try { - if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IOException("Failed to close client in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final byte[] data; - private final int size; - private final Object lock = new Object(); - private Throwable thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - private final OperationParserAndExecutor parserAndExecutor; - - RingBufferStream(InputStream in, int size) { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); } - catch (IOException e) { throw new UncheckedIOException(e); } - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw new RuntimeException("Error reading input", thrown); - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - public CompletableFuture next() throws IOException { - return parserAndExecutor.next(); - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } - catch (Throwable t) { - synchronized (lock) { - done = true; - thrown = t; - } - } - } - - private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - - RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); } - - @Override - String getDocumentJson(long start, long end) { - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - return payload; - } - } - } - - private abstract class OperationParserAndExecutor { - - private final JsonParser parser; - private final boolean multipleOperations; - - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException { - this.parser = parser; - this.multipleOperations = multipleOperations; - if (multipleOperations) expect(START_ARRAY); - } - - abstract String getDocumentJson(long start, long end); - - CompletableFuture next() throws IOException { - JsonToken token = parser.nextToken(); - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && !multipleOperations) return null; - else if (token == START_OBJECT); - else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset()); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - break; - - case END_OBJECT: - break loop; - - default: - throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + - parser.getTokenLocation().getByteOffset()); - } - } - if (id == null) - throw new IllegalArgumentException("No document id for document at offset " + start); - - if (end < start) - throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); - } - } - - void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - } - - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonFeeder build() { - return new JsonFeeder(client, parameters); - } - - } -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java new file mode 100644 index 00000000000..99d05a4bae8 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java @@ -0,0 +1,364 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + */ +public class JsonStreamFeeder implements Closeable { + + private final FeedClient client; + private final OperationParameters protoParameters; + + private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) { + this.client = client; + this.protoParameters = protoParameters; + } + + public static Builder builder(FeedClient client) { return new Builder(client); } + + /** Feeds a stream containing a JSON array of feed operations on the form + *
+     *     [
+     *       {
+     *         "id": "id:ns:type::boo",
+     *         "fields": { ... document fields ... }
+     *       },
+     *       {
+     *         "put": "id:ns:type::foo",
+     *         "fields": { ... document fields ... }
+     *       },
+     *       {
+     *         "update": "id:ns:type:n=4:bar",
+     *         "create": true,
+     *         "fields": { ... partial update fields ... }
+     *       },
+     *       {
+     *         "remove": "id:ns:type:g=foo:bar",
+     *         "condition": "type.baz = \"bax\""
+     *       },
+     *       ...
+     *     ]
+     * 
+ * Note that {@code "id"} is an alias for the document put operation. + */ + public void feed(InputStream jsonStream) throws IOException { + feed(jsonStream, 1 << 26, false); + } + + BenchmarkResult benchmark(InputStream jsonStream) throws IOException { + return feed(jsonStream, 1 << 26, true).get(); + } + + Optional feed(InputStream jsonStream, int size, boolean benchmark) throws IOException { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + buffer.expect(JsonToken.START_ARRAY); + AtomicInteger okCount = new AtomicInteger(); + AtomicInteger failedCount = new AtomicInteger(); + long startTime = System.nanoTime(); + CompletableFuture result; + AtomicReference thrown = new AtomicReference<>(); + while ((result = buffer.next()) != null) { + result.whenComplete((r, t) -> { + if (t != null) { + failedCount.incrementAndGet(); + if (!benchmark) thrown.set(t); + } else + okCount.incrementAndGet(); + }); + if (thrown.get() != null) + sneakyThrow(thrown.get()); + } + if (!benchmark) return Optional.empty(); + Duration duration = Duration.ofNanos(System.nanoTime() - startTime); + double throughPut = (double)okCount.get() / duration.toMillis() * 1000D; + return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut)); + } + + @SuppressWarnings("unchecked") + static void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } + + private static final JsonFactory factory = new JsonFactory(); + + @Override public void close() throws IOException { client.close(); } + + private class RingBufferStream extends InputStream { + + private final byte[] b = new byte[1]; + private final InputStream in; + private final byte[] data; + private final int size; + private final Object lock = new Object(); + private final JsonParser parser; + private Throwable thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; + + RingBufferStream(InputStream in, int size) { + this.in = in; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + try { this.parser = factory.createParser(this); } + catch (IOException e) { throw new UncheckedIOException(e); } + } + + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw new RuntimeException("Error reading input", thrown); + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + public CompletableFuture next() throws IOException { + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + switch (parser.nextToken()) { + case END_ARRAY: return null; + case START_OBJECT: break; + default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + break; + + case END_OBJECT: + break loop; + + default: + throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + } + + if (id == null) + throw new IllegalArgumentException("No document id for document at offset " + start); + + if (end < start) + throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); + } + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + + } + + + public static class Builder { + + final FeedClient client; + OperationParameters parameters = OperationParameters.empty(); + + private Builder(FeedClient client) { + this.client = requireNonNull(client); + } + + public Builder withTimeout(Duration timeout) { + parameters = parameters.timeout(timeout); + return this; + } + + public Builder withRoute(String route) { + parameters = parameters.route(route); + return this; + } + + public Builder withTracelevel(int tracelevel) { + parameters = parameters.tracelevel(tracelevel); + return this; + } + + public JsonStreamFeeder build() { + return new JsonStreamFeeder(client, parameters); + } + + } + + static class BenchmarkResult { + final int okCount; + final int errorCount; + final Duration duration; + final double throughput; + + BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) { + this.okCount = okCount; + this.errorCount = errorCount; + this.duration = duration; + this.throughput = throughput; + } + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java deleted file mode 100644 index 0f14f9ab4be..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.joining; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -class JsonFeederTest { - - @Test - void test() throws IOException { - int docs = 1 << 14; - String json = "[\n" + - - IntStream.range(0, docs).mapToObj(i -> - " {\n" + - " \"id\": \"id:ns:type::abc" + i + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " },\n" - ).collect(joining()) + - - " {\n" + - " \"id\": \"id:ns:type::abc" + docs + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " }\n" + - "]"; - ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); - Set ids = new ConcurrentSkipListSet<>(); - AtomicInteger resultsReceived = new AtomicInteger(); - AtomicBoolean completedSuccessfully = new AtomicBoolean(); - AtomicReference exceptionThrow = new AtomicReference<>(); - long startNanos = System.nanoTime(); - JsonFeeder.builder(new FeedClient() { - - @Override - public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { - ids.add(documentId.userSpecific()); - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture remove(DocumentId documentId, OperationParameters params) { - return createSuccessResult(documentId); - } - - @Override - public void close(boolean graceful) { } - - private CompletableFuture createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); - } - - }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() { - @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); } - @Override public void onError(Throwable error) { exceptionThrow.set(error); } - @Override public void onComplete() { completedSuccessfully.set(true); } - }).join(); // TODO: hangs when buffer is smaller than largest document - System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - assertEquals(docs + 1, ids.size()); - assertEquals(docs + 1, resultsReceived.get()); - assertTrue(completedSuccessfully.get()); - assertNull(exceptionThrow.get()); - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java new file mode 100644 index 00000000000..28a50b88396 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -0,0 +1,69 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class JsonStreamFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 14; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); + Set ids = new HashSet<>(); + long startNanos = System.nanoTime(); + JsonStreamFeeder.builder(new FeedClient() { + + @Override + public CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture remove(DocumentId documentId, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public void close(boolean graceful) { } + + }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, ids.size()); + } + +} -- cgit v1.2.3