summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java10
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java)313
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java (renamed from vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java)35
3 files changed, 216 insertions, 142 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index e3f726eaf11..40c6ac56022 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -55,11 +55,11 @@ public class CliClient {
return 0;
}
try (InputStream in = createFeedInputStream(cliArgs);
- JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) {
+ JsonFeeder feeder = createJsonFeeder(cliArgs)) {
if (cliArgs.benchmarkModeEnabled()) {
printBenchmarkResult(feeder.benchmark(in));
} else {
- feeder.feed(in);
+ feeder.feedMany(in);
}
}
return 0;
@@ -85,9 +85,9 @@ public class CliClient {
return builder.build();
}
- private static JsonStreamFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
+ private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
FeedClient feedClient = createFeedClient(cliArgs);
- JsonStreamFeeder.Builder builder = JsonStreamFeeder.builder(feedClient);
+ JsonFeeder.Builder builder = JsonFeeder.builder(feedClient);
cliArgs.timeout().ifPresent(builder::withTimeout);
cliArgs.route().ifPresent(builder::withRoute);
cliArgs.traceLevel().ifPresent(builder::withTracelevel);
@@ -98,7 +98,7 @@ public class CliClient {
return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get());
}
- private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException {
+ private void printBenchmarkResult(JsonFeeder.BenchmarkResult result) throws IOException {
JsonFactory factory = new JsonFactory();
try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
generator.writeStartObject();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 99d05a4bae8..2a6d2e15747 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -12,14 +12,18 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
+import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
@@ -30,17 +34,44 @@ import static java.util.Objects.requireNonNull;
/**
* @author jonmv
+ * @author bjorncs
*/
-public class JsonStreamFeeder implements Closeable {
+public class JsonFeeder implements Closeable {
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "json-feeder-result-executor");
+ t.setDaemon(true);
+ return t;
+ });
private final FeedClient client;
private final OperationParameters protoParameters;
- private JsonStreamFeeder(FeedClient client, OperationParameters protoParameters) {
+ private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
this.client = client;
this.protoParameters = protoParameters;
}
+ public interface ResultCallback {
+ /**
+ * Invoked after each operation has either completed successfully or failed
+ *
+ * @param result Non-null if operation completed successfully
+ * @param error Non-null if operation failed
+ */
+ void onNextResult(Result result, Throwable error);
+
+ /**
+ * Invoked if an unrecoverable error occurred during feed processing,
+ * after which no other {@link ResultCallback} methods are invoked.
+ */
+ void onError(Throwable error);
+
+ /**
+ * Invoked when all feed operations are either completed successfully or failed.
+ */
+ void onComplete();
+ }
+
public static Builder builder(FeedClient client) { return new Builder(client); }
/** Feeds a stream containing a JSON array of feed operations on the form
@@ -68,45 +99,59 @@ public class JsonStreamFeeder implements Closeable {
* </pre>
* Note that {@code "id"} is an alias for the document put operation.
*/
- public void feed(InputStream jsonStream) throws IOException {
- feed(jsonStream, 1 << 26, false);
+ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
+ return feedMany(jsonStream, 1 << 26, resultCallback);
}
- BenchmarkResult benchmark(InputStream jsonStream) throws IOException {
- return feed(jsonStream, 1 << 26, true).get();
- }
-
- Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException {
+ CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
- buffer.expect(JsonToken.START_ARRAY);
- AtomicInteger okCount = new AtomicInteger();
- AtomicInteger failedCount = new AtomicInteger();
- long startTime = System.nanoTime();
+ CompletableFuture<Void> overallResult = new CompletableFuture<>();
CompletableFuture<Result> result;
- AtomicReference<Throwable> thrown = new AtomicReference<>();
- while ((result = buffer.next()) != null) {
- result.whenComplete((r, t) -> {
- if (t != null) {
- failedCount.incrementAndGet();
- if (!benchmark) thrown.set(t);
- } else
- okCount.incrementAndGet();
- });
- if (thrown.get() != null)
- sneakyThrow(thrown.get());
+ AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
+ try {
+ while ((result = buffer.next()) != null) {
+ pending.incrementAndGet();
+ result.whenCompleteAsync((r, t) -> {
+ if (!finalCallbackInvoked.get()) {
+ resultCallback.onNextResult(r, t);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ }
+ }, resultExecutor);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ } catch (Exception e) {
+ if (finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onError(e);
+ overallResult.completeExceptionally(e);
+ });
+ }
}
- if (!benchmark) return Optional.empty();
- Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
- double throughPut = (double)okCount.get() / duration.toMillis() * 1000D;
- return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut));
+ return overallResult;
}
- @SuppressWarnings("unchecked")
- static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; }
-
private static final JsonFactory factory = new JsonFactory();
- @Override public void close() throws IOException { client.close(); }
+ @Override public void close() throws IOException {
+ client.close();
+ resultExecutor.shutdown();
+ try {
+ if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ throw new IOException("Failed to close client in time");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
private class RingBufferStream extends InputStream {
@@ -115,12 +160,12 @@ public class JsonStreamFeeder implements Closeable {
private final byte[] data;
private final int size;
private final Object lock = new Object();
- private final JsonParser parser;
private Throwable thrown = null;
private long tail = 0;
private long pos = 0;
private long head = 0;
private boolean done = false;
+ private final OperationParserAndExecutor parserAndExecutor;
RingBufferStream(InputStream in, int size) {
this.in = in;
@@ -129,7 +174,7 @@ public class JsonStreamFeeder implements Closeable {
new Thread(this::fill, "feed-reader").start();
- try { this.parser = factory.createParser(this); }
+ try { this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); }
catch (IOException e) { throw new UncheckedIOException(e); }
}
@@ -164,24 +209,104 @@ public class JsonStreamFeeder implements Closeable {
}
}
- void expect(JsonToken token) throws IOException {
- if (parser.nextToken() != token)
- throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ public CompletableFuture<Result> next() throws IOException {
+ return parserAndExecutor.next();
}
- public CompletableFuture<Result> next() throws IOException {
+ private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
+ private byte[] copy(long start, long end) {
+ int length = (int) (end - start);
+ byte[] buffer = new byte[prefix.length + length + 1];
+ System.arraycopy(prefix, 0, buffer, 0, prefix.length);
+
+ int offset = (int) (start % size);
+ int toWrite = min(length, size - offset);
+ System.arraycopy(data, offset, buffer, prefix.length, toWrite);
+ if (toWrite < length)
+ System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
+
+ buffer[buffer.length - 1] = '}';
+ return buffer;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
+ in.close();
+ }
+
+ private void fill() {
+ try {
+ while (true) {
+ int free;
+ synchronized (lock) {
+ while ((free = (int) (tail + size - head)) <= 0 && ! done)
+ lock.wait();
+ }
+ if (done) break;
+
+ int off = (int) (head % size);
+ int len = min(min(free, size - off), 1 << 13);
+ int read = in.read(data, off, len);
+
+ synchronized (lock) {
+ if (read < 0) done = true;
+ else head += read;
+ lock.notify();
+ }
+ }
+ }
+ catch (Throwable t) {
+ synchronized (lock) {
+ done = true;
+ thrown = t;
+ }
+ }
+ }
+
+ private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ RingBufferBackedOperationParserAndExecutor(JsonParser parser) throws IOException { super(parser, true); }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ String payload = new String(copy(start, end), UTF_8);
+ synchronized (lock) {
+ tail = end;
+ lock.notify();
+ }
+ return payload;
+ }
+ }
+ }
+
+ private abstract class OperationParserAndExecutor {
+
+ private final JsonParser parser;
+ private final boolean multipleOperations;
+
+ protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) throws IOException {
+ this.parser = parser;
+ this.multipleOperations = multipleOperations;
+ if (multipleOperations) expect(START_ARRAY);
+ }
+
+ abstract String getDocumentJson(long start, long end);
+
+ CompletableFuture<Result> next() throws IOException {
+ JsonToken token = parser.nextToken();
+ if (token == END_ARRAY && multipleOperations) return null;
+ else if (token == null && !multipleOperations) return null;
+ else if (token == START_OBJECT);
+ else throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + parser.getTokenLocation().getByteOffset());
long start = 0, end = -1;
OperationType type = null;
DocumentId id = null;
OperationParameters parameters = protoParameters;
- switch (parser.nextToken()) {
- case END_ARRAY: return null;
- case START_OBJECT: break;
- default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
- }
-
loop: while (true) {
switch (parser.nextToken()) {
case FIELD_NAME:
@@ -204,7 +329,7 @@ public class JsonStreamFeeder implements Closeable {
break;
}
default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
+ parser.getTokenLocation().getByteOffset());
}
break;
@@ -213,22 +338,15 @@ public class JsonStreamFeeder implements Closeable {
default:
throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
- parser.getTokenLocation().getByteOffset());
+ parser.getTokenLocation().getByteOffset());
}
}
-
if (id == null)
throw new IllegalArgumentException("No document id for document at offset " + start);
if (end < start)
throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
-
- String payload = new String(copy(start, end), UTF_8);
- synchronized (lock) {
- tail = end;
- lock.notify();
- }
-
+ String payload = getDocumentJson(start, end);
switch (type) {
case PUT: return client.put (id, payload, parameters);
case UPDATE: return client.update(id, payload, parameters);
@@ -237,27 +355,17 @@ public class JsonStreamFeeder implements Closeable {
}
}
- private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
- private byte[] copy(long start, long end) {
- int length = (int) (end - start);
- byte[] buffer = new byte[prefix.length + length + 1];
- System.arraycopy(prefix, 0, buffer, 0, prefix.length);
-
- int offset = (int) (start % size);
- int toWrite = min(length, size - offset);
- System.arraycopy(data, offset, buffer, prefix.length, toWrite);
- if (toWrite < length)
- System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
-
- buffer[buffer.length - 1] = '}';
- return buffer;
+ void expect(JsonToken token) throws IOException {
+ if (parser.nextToken() != token)
+ throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
}
private String readString() throws IOException {
String value = parser.nextTextValue();
if (value == null)
throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
}
@@ -266,7 +374,7 @@ public class JsonStreamFeeder implements Closeable {
Boolean value = parser.nextBooleanValue();
if (value == null)
throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
return value;
@@ -276,44 +384,6 @@ public class JsonStreamFeeder implements Closeable {
return DocumentId.of(readString());
}
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- done = true;
- lock.notifyAll();
- }
- in.close();
- }
-
- private void fill() {
- try {
- while (true) {
- int free;
- synchronized (lock) {
- while ((free = (int) (tail + size - head)) <= 0 && ! done)
- lock.wait();
- }
- if (done) break;
-
- int off = (int) (head % size);
- int len = min(min(free, size - off), 1 << 13);
- int read = in.read(data, off, len);
-
- synchronized (lock) {
- if (read < 0) done = true;
- else head += read;
- lock.notify();
- }
- }
- }
- catch (Throwable t) {
- synchronized (lock) {
- done = true;
- thrown = t;
- }
- }
- }
-
}
@@ -341,24 +411,9 @@ public class JsonStreamFeeder implements Closeable {
return this;
}
- public JsonStreamFeeder build() {
- return new JsonStreamFeeder(client, parameters);
+ public JsonFeeder build() {
+ return new JsonFeeder(client, parameters);
}
}
-
- static class BenchmarkResult {
- final int okCount;
- final int errorCount;
- final Duration duration;
- final double throughput;
-
- BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) {
- this.okCount = okCount;
- this.errorCount = errorCount;
- this.duration = duration;
- this.throughput = throughput;
- }
- }
-
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
index 28a50b88396..0f14f9ab4be 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -5,16 +5,21 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
-class JsonStreamFeederTest {
+class JsonFeederTest {
@Test
void test() throws IOException {
@@ -38,32 +43,46 @@ class JsonStreamFeederTest {
" }\n" +
"]";
ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8));
- Set<String> ids = new HashSet<>();
+ Set<String> ids = new ConcurrentSkipListSet<>();
+ AtomicInteger resultsReceived = new AtomicInteger();
+ AtomicBoolean completedSuccessfully = new AtomicBoolean();
+ AtomicReference<Throwable> exceptionThrow = new AtomicReference<>();
long startNanos = System.nanoTime();
- JsonStreamFeeder.builder(new FeedClient() {
+ JsonFeeder.builder(new FeedClient() {
@Override
public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
ids.add(documentId.userSpecific());
- return new CompletableFuture<>();
+ return createSuccessResult(documentId);
}
@Override
public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
- return new CompletableFuture<>();
+ return createSuccessResult(documentId);
}
@Override
public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
- return new CompletableFuture<>();
+ return createSuccessResult(documentId);
}
@Override
public void close(boolean graceful) { }
- }).build().feed(in, 1 << 7, false); // TODO: hangs when buffer is smaller than largest document
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null));
+ }
+
+ }).build().feedMany(in, 1 << 7, new JsonFeeder.ResultCallback() {
+ @Override public void onNextResult(Result result, Throwable error) { resultsReceived.incrementAndGet(); }
+ @Override public void onError(Throwable error) { exceptionThrow.set(error); }
+ @Override public void onComplete() { completedSuccessfully.set(true); }
+ }).join(); // TODO: hangs when buffer is smaller than largest document
System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
assertEquals(docs + 1, ids.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
}
}