summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java514
1 files changed, 0 insertions, 514 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
deleted file mode 100644
index 2d7caea9f26..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ /dev/null
@@ -1,514 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import ai.vespa.feed.client.FeedClient.OperationType;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonLocation;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
-import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
-import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
-import static java.lang.Math.min;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-
-/**
- * @author jonmv
- * @author bjorncs
- */
-public class JsonFeeder implements Closeable {
-
- private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r, "json-feeder-result-executor");
- t.setDaemon(true);
- return t;
- });
- private final FeedClient client;
- private final OperationParameters protoParameters;
-
- private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
- this.client = client;
- this.protoParameters = protoParameters;
- }
-
- public interface ResultCallback {
- /**
- * Invoked after each operation has either completed successfully or failed
- *
- * @param result Non-null if operation completed successfully
- * @param error Non-null if operation failed
- */
- default void onNextResult(Result result, FeedException error) { }
-
- /**
- * Invoked if an unrecoverable error occurred during feed processing,
- * after which no other {@link ResultCallback} methods are invoked.
- */
- default void onError(FeedException error) { }
-
- /**
- * Invoked when all feed operations are either completed successfully or failed.
- */
- default void onComplete() { }
- }
-
- public static Builder builder(FeedClient client) { return new Builder(client); }
-
- /** Feeds single JSON feed operations on the form
- * <pre>
- * {
- * "id": "id:ns:type::boo",
- * "fields": { ... document fields ... }
- * }
- * </pre>
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- public CompletableFuture<Result> feedSingle(String json) {
- CompletableFuture<Result> result = new CompletableFuture<>();
- try {
- SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
- parser.next().whenCompleteAsync((operationResult, error) -> {
- if (error != null) {
- result.completeExceptionally(error);
- } else {
- result.complete(operationResult);
- }
- }, resultExecutor);
- } catch (Exception e) {
- resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
- }
- return result;
- }
-
- /** Feeds a stream containing a JSON array of feed operations on the form
- * <pre>
- * [
- * {
- * "id": "id:ns:type::boo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "put": "id:ns:type::foo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "update": "id:ns:type:n=4:bar",
- * "create": true,
- * "fields": { ... partial update fields ... }
- * },
- * {
- * "remove": "id:ns:type:g=foo:bar",
- * "condition": "type.baz = \"bax\""
- * },
- * ...
- * ]
- * </pre>
- * Note that {@code "id"} is an alias for the document put operation.
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
- return feedMany(jsonStream, 1 << 26, resultCallback);
- }
-
- /**
- * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance.
- * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details.
- */
- public CompletableFuture<Void> feedMany(InputStream jsonStream) {
- return feedMany(jsonStream, new ResultCallback() { });
- }
-
- CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
- CompletableFuture<Void> overallResult = new CompletableFuture<>();
- CompletableFuture<Result> result;
- AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
- AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
- try {
- RingBufferStream buffer = new RingBufferStream(jsonStream, size);
- while ((result = buffer.next()) != null) {
- pending.incrementAndGet();
- result.whenCompleteAsync((r, t) -> {
- if (!finalCallbackInvoked.get()) {
- resultCallback.onNextResult(r, (FeedException) t);
- }
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
- resultCallback.onComplete();
- overallResult.complete(null);
- }
- }, resultExecutor);
- }
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
- resultExecutor.execute(() -> {
- resultCallback.onComplete();
- overallResult.complete(null);
- });
- }
- } catch (Exception e) {
- if (finalCallbackInvoked.compareAndSet(false, true)) {
- resultExecutor.execute(() -> {
- FeedException wrapped = wrapException(e);
- resultCallback.onError(wrapped);
- overallResult.completeExceptionally(wrapped);
- });
- }
- }
- return overallResult;
- }
-
- private static final JsonFactory factory = new JsonFactory();
-
- @Override public void close() throws IOException {
- client.close();
- resultExecutor.shutdown();
- try {
- if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
- throw new IOException("Failed to close client in time");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private FeedException wrapException(Exception e) {
- if (e instanceof FeedException) return (FeedException) e;
- if (e instanceof IOException) {
- return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e);
- }
- return new FeedException(e);
- }
-
- private class RingBufferStream extends InputStream {
-
- private final byte[] b = new byte[1];
- private final InputStream in;
- private final Object lock = new Object();
- private byte[] data;
- private int size;
- private IOException thrown = null;
- private long tail = 0;
- private long pos = 0;
- private long head = 0;
- private boolean done = false;
- private final OperationParserAndExecutor parserAndExecutor;
-
- RingBufferStream(InputStream in, int size) throws IOException {
- this.in = in;
- this.data = new byte[size];
- this.size = size;
-
- new Thread(this::fill, "feed-reader").start();
-
- this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
- }
-
- @Override
- public int read() throws IOException {
- return read(b, 0, 1) == -1 ? -1 : b[0];
- }
-
- @Override
- public int read(byte[] buffer, int off, int len) throws IOException {
- try {
- int ready;
- synchronized (lock) {
- if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write.
- expand();
-
- while ((ready = (int) (head - pos)) == 0 && ! done)
- lock.wait();
- }
- if (thrown != null) throw thrown;
- if (ready == 0) return -1;
-
- ready = min(ready, len);
- int offset = (int) (pos % size);
- int length = min(ready, size - offset);
- System.arraycopy(data, offset, buffer, off, length);
- if (length < ready)
- System.arraycopy(data, 0, buffer, off + length, ready - length);
-
- pos += ready;
- return ready;
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
- }
- }
-
- public CompletableFuture<Result> next() throws IOException {
- return parserAndExecutor.next();
- }
-
- private void expand() {
- int newSize = size * 2;
- if (newSize <= size)
- throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much");
-
- byte[] newData = new byte[newSize];
- int offset = (int) (tail % size);
- int newOffset = (int) (tail % newSize);
- int toWrite = size - offset;
- System.arraycopy(data, offset, newData, newOffset, toWrite);
- if (toWrite < size)
- System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite);
- size = newSize;
- data = newData;
- lock.notify();
- }
-
- private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
- private byte[] copy(long start, long end) {
- int length = (int) (end - start);
- byte[] buffer = new byte[prefix.length + length + 1];
- System.arraycopy(prefix, 0, buffer, 0, prefix.length);
-
- int offset = (int) (start % size);
- int toWrite = min(length, size - offset);
- System.arraycopy(data, offset, buffer, prefix.length, toWrite);
- if (toWrite < length)
- System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
-
- buffer[buffer.length - 1] = '}';
- return buffer;
- }
-
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- done = true;
- lock.notifyAll();
- }
- in.close();
- }
-
- private void fill() {
- try {
- while (true) {
- int free;
- synchronized (lock) {
- while ((free = (int) (tail + size - head)) <= 0 && ! done)
- lock.wait();
- }
- if (done) break;
-
- int off = (int) (head % size);
- int len = min(min(free, size - off), 1 << 13);
- int read = in.read(data, off, len);
-
- synchronized (lock) {
- if (read < 0) done = true;
- else head += read;
- lock.notify();
- }
- }
- } catch (InterruptedException e) {
- synchronized (lock) {
- done = true;
- thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage());
- }
- } catch (IOException e) {
- synchronized (lock) {
- done = true;
- thrown = e;
- }
- }
- }
-
- private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
-
- RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); }
-
- @Override
- String getDocumentJson(long start, long end) {
- String payload = new String(copy(start, end), UTF_8);
- synchronized (lock) {
- tail = end;
- lock.notify();
- }
- return payload;
- }
- }
- }
-
- private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
-
- private final byte[] json;
-
- SingleOperationParserAndExecutor(byte[] json) throws IOException {
- super(factory.createParser(json), false);
- this.json = json;
- }
-
- @Override
- String getDocumentJson(long start, long end) {
- return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}";
- }
- }
-
- private abstract class OperationParserAndExecutor {
-
- private final JsonParser parser;
- private final boolean multipleOperations;
- private boolean arrayPrefixParsed;
-
- protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) {
- this.parser = parser;
- this.multipleOperations = multipleOperations;
- }
-
- abstract String getDocumentJson(long start, long end);
-
- OperationParseException parseException(String error) {
- JsonLocation location = parser.getTokenLocation();
- return new OperationParseException(error + " at offset " + location.getByteOffset() +
- " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")");
- }
-
- CompletableFuture<Result> next() throws IOException {
- JsonToken token = parser.nextToken();
- if (multipleOperations && ! arrayPrefixParsed && token == START_ARRAY) {
- arrayPrefixParsed = true;
- token = parser.nextToken();
- }
- if (token == END_ARRAY && multipleOperations) return null;
- else if (token == null && ! arrayPrefixParsed) return null;
- else if (token != START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'");
- long start = 0, end = -1;
- OperationType type = null;
- DocumentId id = null;
- OperationParameters parameters = protoParameters;
- loop: while (true) {
- switch (parser.nextToken()) {
- case FIELD_NAME:
- switch (parser.getText()) {
- case "id":
- case "put": type = PUT; id = readId(); break;
- case "update": type = UPDATE; id = readId(); break;
- case "remove": type = REMOVE; id = readId(); break;
- case "condition": parameters = parameters.testAndSetCondition(readString()); break;
- case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
- case "fields": {
- expect(START_OBJECT);
- start = parser.getTokenLocation().getByteOffset();
- int depth = 1;
- while (depth > 0) switch (parser.nextToken()) {
- case START_OBJECT: ++depth; break;
- case END_OBJECT: --depth; break;
- }
- end = parser.getTokenLocation().getByteOffset() + 1;
- break;
- }
- default: throw parseException("Unexpected field name '" + parser.getText() + "'");
- }
- break;
-
- case END_OBJECT:
- break loop;
-
- default:
- throw parseException("Unexpected token '" + parser.currentToken() + "'");
- }
- }
- if (id == null)
- throw parseException("No document id for document");
- if (type == REMOVE) {
- if (end >= start)
- throw parseException("Illegal 'fields' object for remove operation");
- else
- start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head.
- }
- else if (end < start)
- throw parseException("No 'fields' object for document");
-
- String payload = getDocumentJson(start, end);
- switch (type) {
- case PUT: return client.put (id, payload, parameters);
- case UPDATE: return client.update(id, payload, parameters);
- case REMOVE: return client.remove(id, parameters);
- default: throw new OperationParseException("Unexpected operation type '" + type + "'");
- }
- }
-
- private void expect(JsonToken token) throws IOException {
- if (parser.nextToken() != token)
- throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
- }
-
- private String readString() throws IOException {
- String value = parser.nextTextValue();
- if (value == null)
- throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
- }
-
- private boolean readBoolean() throws IOException {
- Boolean value = parser.nextBooleanValue();
- if (value == null)
- throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
-
- }
-
- private DocumentId readId() throws IOException {
- return DocumentId.of(readString());
- }
-
- }
-
- public static class Builder {
-
- final FeedClient client;
- OperationParameters parameters = OperationParameters.empty();
-
- private Builder(FeedClient client) {
- this.client = requireNonNull(client);
- }
-
- public Builder withTimeout(Duration timeout) {
- parameters = parameters.timeout(timeout);
- return this;
- }
-
- public Builder withRoute(String route) {
- parameters = parameters.route(route);
- return this;
- }
-
- public Builder withTracelevel(int tracelevel) {
- parameters = parameters.tracelevel(tracelevel);
- return this;
- }
-
- public JsonFeeder build() {
- return new JsonFeeder(client, parameters);
- }
-
- }
-
-}