diff options
Diffstat (limited to 'vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java')
-rw-r--r-- | vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 514 |
1 files changed, 514 insertions, 0 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java new file mode 100644 index 00000000000..41b432449df --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -0,0 +1,514 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonLocation; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + * @author bjorncs + */ +public class JsonFeeder implements Closeable { + + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); + private final FeedClient client; + private final OperationParameters protoParameters; + + private JsonFeeder(FeedClient client, OperationParameters protoParameters) { + this.client = client; + this.protoParameters = protoParameters; + } + + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + default void onNextResult(Result result, FeedException error) { } + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + default void onError(FeedException error) { } + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + default void onComplete() { } + } + + public static Builder builder(FeedClient client) { return new Builder(client); } + + /** Feeds single JSON feed operations on the form + * <pre> + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * } + * </pre> + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Result> feedSingle(String json) { + CompletableFuture<Result> result = new CompletableFuture<>(); + try { + SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); + parser.next().whenCompleteAsync((operationResult, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(operationResult); + } + }, resultExecutor); + } catch (Exception e) { + resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); + } + return result; + } + + /** Feeds a stream containing a JSON array of feed operations on the form + * <pre> + * [ + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * }, + * { + * "put": "id:ns:type::foo", + * "fields": { ... document fields ... } + * }, + * { + * "update": "id:ns:type:n=4:bar", + * "create": true, + * "fields": { ... partial update fields ... } + * }, + * { + * "remove": "id:ns:type:g=foo:bar", + * "condition": "type.baz = \"bax\"" + * }, + * ... + * ] + * </pre> + * Note that {@code "id"} is an alias for the document put operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); + } + + /** + * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. + * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream) { + return feedMany(jsonStream, new ResultCallback() { }); + } + + CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { + CompletableFuture<Void> overallResult = new CompletableFuture<>(); + CompletableFuture<Result> result; + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenCompleteAsync((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultCallback.onNextResult(r, (FeedException) t); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultCallback.onComplete(); + overallResult.complete(null); + } + }, resultExecutor); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + FeedException wrapped = wrapException(e); + resultCallback.onError(wrapped); + overallResult.completeExceptionally(wrapped); + }); + } + } + return overallResult; + } + + private static final JsonFactory factory = new JsonFactory(); + + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private FeedException wrapException(Exception e) { + if (e instanceof FeedException) return (FeedException) e; + if (e instanceof IOException) { + return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); + } + return new FeedException(e); + } + + private class RingBufferStream extends InputStream { + + private final byte[] b = new byte[1]; + private final InputStream in; + private final Object lock = new Object(); + private byte[] data; + private int size; + private IOException thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; + private final OperationParserAndExecutor parserAndExecutor; + + RingBufferStream(InputStream in, int size) throws IOException { + this.in = in; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); + } + + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. + expand(); + + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw thrown; + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + public CompletableFuture<Result> next() throws IOException { + return parserAndExecutor.next(); + } + + private void expand() { + int newSize = size * 2; + if (newSize <= size) + throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); + + byte[] newData = new byte[newSize]; + int offset = (int) (tail % size); + int newOffset = (int) (tail % newSize); + int toWrite = size - offset; + System.arraycopy(data, offset, newData, newOffset, toWrite); + if (toWrite < size) + System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); + size = newSize; + data = newData; + lock.notify(); + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } catch (InterruptedException e) { + synchronized (lock) { + done = true; + thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); + } + } catch (IOException e) { + synchronized (lock) { + done = true; + thrown = e; + } + } + } + + private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { + + RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } + + @Override + String getDocumentJson(long start, long end) { + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + return payload; + } + } + } + + private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { + + private final byte[] json; + + SingleOperationParserAndExecutor(byte[] json) throws IOException { + super(factory.createParser(json), false); + this.json = json; + } + + @Override + String getDocumentJson(long start, long end) { + return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}"; + } + } + + private abstract class OperationParserAndExecutor { + + private final JsonParser parser; + private final boolean multipleOperations; + private boolean arrayPrefixParsed; + + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { + this.parser = parser; + this.multipleOperations = multipleOperations; + } + + abstract String getDocumentJson(long start, long end); + + OperationParseException parseException(String error) { + JsonLocation location = parser.getTokenLocation(); + return new OperationParseException(error + " at offset " + location.getByteOffset() + + " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")"); + } + + CompletableFuture<Result> next() throws IOException { + JsonToken token = parser.nextToken(); + if (multipleOperations && ! arrayPrefixParsed && token == JsonToken.START_ARRAY) { + arrayPrefixParsed = true; + token = parser.nextToken(); + } + if (token == JsonToken.END_ARRAY && multipleOperations) return null; + else if (token == null && ! arrayPrefixParsed) return null; + else if (token != JsonToken.START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'"); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw parseException("Unexpected field name '" + parser.getText() + "'"); + } + break; + + case END_OBJECT: + break loop; + + default: + throw parseException("Unexpected token '" + parser.currentToken() + "'"); + } + } + if (id == null) + throw parseException("No document id for document"); + if (type == REMOVE) { + if (end >= start) + throw parseException("Illegal 'fields' object for remove operation"); + else + start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head. + } + else if (end < start) + throw parseException("No 'fields' object for document"); + + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new OperationParseException("Unexpected operation type '" + type + "'"); + } + } + + private void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new OperationParseException("Expected '" + JsonToken.VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new OperationParseException("Expected '" + JsonToken.VALUE_FALSE + "' or '" + JsonToken.VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + } + + public static class Builder { + + final FeedClient client; + OperationParameters parameters = OperationParameters.empty(); + + private Builder(FeedClient client) { + this.client = requireNonNull(client); + } + + public Builder withTimeout(Duration timeout) { + parameters = parameters.timeout(timeout); + return this; + } + + public Builder withRoute(String route) { + parameters = parameters.route(route); + return this; + } + + public Builder withTracelevel(int tracelevel) { + parameters = parameters.tracelevel(tracelevel); + return this; + } + + public JsonFeeder build() { + return new JsonFeeder(client, parameters); + } + + } + +} |