diff options
Diffstat (limited to 'vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java | 514 |
1 files changed, 0 insertions, 514 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java deleted file mode 100644 index 2d7caea9f26..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ /dev/null @@ -1,514 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonLocation; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static ai.vespa.feed.client.FeedClient.OperationType.PUT; -import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; -import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - * @author bjorncs - */ -public class JsonFeeder implements Closeable { - - private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "json-feeder-result-executor"); - t.setDaemon(true); - return t; - }); - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public interface ResultCallback { - /** - * Invoked after each operation has either completed successfully or failed - * - * @param result Non-null if operation completed successfully - * @param error Non-null if operation failed - */ - default void onNextResult(Result result, FeedException error) { } - - /** - * Invoked if an unrecoverable error occurred during feed processing, - * after which no other {@link ResultCallback} methods are invoked. - */ - default void onError(FeedException error) { } - - /** - * Invoked when all feed operations are either completed successfully or failed. - */ - default void onComplete() { } - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds single JSON feed operations on the form - * <pre> - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * } - * </pre> - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Result> feedSingle(String json) { - CompletableFuture<Result> result = new CompletableFuture<>(); - try { - SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); - parser.next().whenCompleteAsync((operationResult, error) -> { - if (error != null) { - result.completeExceptionally(error); - } else { - result.complete(operationResult); - } - }, resultExecutor); - } catch (Exception e) { - resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); - } - return result; - } - - /** Feeds a stream containing a JSON array of feed operations on the form - * <pre> - * [ - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * }, - * { - * "put": "id:ns:type::foo", - * "fields": { ... document fields ... } - * }, - * { - * "update": "id:ns:type:n=4:bar", - * "create": true, - * "fields": { ... partial update fields ... } - * }, - * { - * "remove": "id:ns:type:g=foo:bar", - * "condition": "type.baz = \"bax\"" - * }, - * ... - * ] - * </pre> - * Note that {@code "id"} is an alias for the document put operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { - return feedMany(jsonStream, 1 << 26, resultCallback); - } - - /** - * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. - * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream) { - return feedMany(jsonStream, new ResultCallback() { }); - } - - CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { - CompletableFuture<Void> overallResult = new CompletableFuture<>(); - CompletableFuture<Result> result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation - AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - while ((result = buffer.next()) != null) { - pending.incrementAndGet(); - result.whenCompleteAsync((r, t) -> { - if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, (FeedException) t); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); - overallResult.complete(null); - } - }, resultExecutor); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); - } - } catch (Exception e) { - if (finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - FeedException wrapped = wrapException(e); - resultCallback.onError(wrapped); - overallResult.completeExceptionally(wrapped); - }); - } - } - return overallResult; - } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { - client.close(); - resultExecutor.shutdown(); - try { - if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IOException("Failed to close client in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private FeedException wrapException(Exception e) { - if (e instanceof FeedException) return (FeedException) e; - if (e instanceof IOException) { - return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); - } - return new FeedException(e); - } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final Object lock = new Object(); - private byte[] data; - private int size; - private IOException thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - private final OperationParserAndExecutor parserAndExecutor; - - RingBufferStream(InputStream in, int size) throws IOException { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. - expand(); - - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw thrown; - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - public CompletableFuture<Result> next() throws IOException { - return parserAndExecutor.next(); - } - - private void expand() { - int newSize = size * 2; - if (newSize <= size) - throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); - - byte[] newData = new byte[newSize]; - int offset = (int) (tail % size); - int newOffset = (int) (tail % newSize); - int toWrite = size - offset; - System.arraycopy(data, offset, newData, newOffset, toWrite); - if (toWrite < size) - System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); - size = newSize; - data = newData; - lock.notify(); - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } catch (InterruptedException e) { - synchronized (lock) { - done = true; - thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); - } - } catch (IOException e) { - synchronized (lock) { - done = true; - thrown = e; - } - } - } - - private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - - RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } - - @Override - String getDocumentJson(long start, long end) { - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - return payload; - } - } - } - - private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { - - private final byte[] json; - - SingleOperationParserAndExecutor(byte[] json) throws IOException { - super(factory.createParser(json), false); - this.json = json; - } - - @Override - String getDocumentJson(long start, long end) { - return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}"; - } - } - - private abstract class OperationParserAndExecutor { - - private final JsonParser parser; - private final boolean multipleOperations; - private boolean arrayPrefixParsed; - - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { - this.parser = parser; - this.multipleOperations = multipleOperations; - } - - abstract String getDocumentJson(long start, long end); - - OperationParseException parseException(String error) { - JsonLocation location = parser.getTokenLocation(); - return new OperationParseException(error + " at offset " + location.getByteOffset() + - " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")"); - } - - CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (multipleOperations && ! arrayPrefixParsed && token == START_ARRAY) { - arrayPrefixParsed = true; - token = parser.nextToken(); - } - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && ! arrayPrefixParsed) return null; - else if (token != START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'"); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw parseException("Unexpected field name '" + parser.getText() + "'"); - } - break; - - case END_OBJECT: - break loop; - - default: - throw parseException("Unexpected token '" + parser.currentToken() + "'"); - } - } - if (id == null) - throw parseException("No document id for document"); - if (type == REMOVE) { - if (end >= start) - throw parseException("Illegal 'fields' object for remove operation"); - else - start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head. - } - else if (end < start) - throw parseException("No 'fields' object for document"); - - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new OperationParseException("Unexpected operation type '" + type + "'"); - } - } - - private void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - } - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonFeeder build() { - return new JsonFeeder(client, parameters); - } - - } - -} |