diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-05-27 00:46:23 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-05-27 09:54:43 +0200 |
commit | fce5c8dd2e6ddf3c2364f4308c1aff2ec2d3aa4a (patch) | |
tree | f3d5cd7a61075a79edca2a753b20b204ba748058 /vespa-feed-client | |
parent | 6b6e59869ab5259a8cd2e382cd2b5164a963a293 (diff) |
WIP JSON parsing (probably works)
Diffstat (limited to 'vespa-feed-client')
3 files changed, 332 insertions, 7 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java index 7ece0d4ef73..74965270fb9 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java @@ -1,11 +1,28 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.UncheckedIOException; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import static ai.vespa.feed.client.FeedClient.OperationType.put; +import static ai.vespa.feed.client.FeedClient.OperationType.remove; +import static ai.vespa.feed.client.FeedClient.OperationType.update; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; /** @@ -48,29 +65,245 @@ public class JsonStreamFeeder { * </pre> * Note that {@code "id"} is an alias for the document put operation. */ - public void feed(InputStream jsonStream) { + public void feed(InputStream jsonStream) throws IOException { + feed(jsonStream, 1 << 26); + } + void feed(InputStream jsonStream, int size) throws IOException { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + buffer.expect(JsonToken.START_ARRAY); + CompletableFuture<Result> result; + AtomicReference<Throwable> thrown = new AtomicReference<>(); + while ((result = buffer.next()) != null) { + result.whenComplete((r, t) -> { + if (t != null) + thrown.set(t); + else + ; // Aggregate stats. + }); + if (thrown.get() != null) + sneakyThrow(thrown.get()); + } } + @SuppressWarnings("unchecked") + static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; } + + private static final JsonFactory factory = new JsonFactory(); - static class Tokenizer { + private class RingBufferStream extends InputStream { + private final byte[] b = new byte[1]; private final InputStream in; - private final JsonParser json; + private final byte[] data; + private final int size; + private final Object lock = new Object(); + private final JsonParser parser; + private Throwable thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; - public Tokenizer(InputStream in, JsonParser json) { + RingBufferStream(InputStream in, int size) { this.in = in; - this.json = json; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + try { this.parser = factory.createParser(this); } + catch (IOException e) { throw new UncheckedIOException(e); } } - } + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw new RuntimeException("Error reading input", thrown); + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + public CompletableFuture<Result> next() throws IOException { + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + switch (parser.nextToken()) { + case END_ARRAY: return null; + case START_OBJECT: break; + default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = put; id = readId(); break; + case "update": type = update; id = readId(); break; + case "remove": type = remove; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + break; + + case END_OBJECT: + break loop; + + default: + throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " + + parser.getTokenLocation().getByteOffset()); + } + } + + if (id == null) + throw new IllegalArgumentException("No document id for document at offset " + start); + + if (end < start) + throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset()); + + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + + switch (type) { + case put: return client.put (id, payload, parameters); + case update: return client.update(id, payload, parameters); + case remove: return client.remove(id, parameters); + default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); + } + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } + catch (Throwable t) { + synchronized (lock) { + done = true; + thrown = t; + } + } + } + + } public static class Builder { final FeedClient client; - OperationParameters parameters; + OperationParameters parameters = OperationParameters.empty(); private Builder(FeedClient client) { this.client = requireNonNull(client); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java index 78450caf204..22546f89ccb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java @@ -2,6 +2,7 @@ package ai.vespa.feed.client; import java.time.Duration; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -67,4 +68,28 @@ public class OperationParameters { public Optional<String> route() { return Optional.ofNullable(route); } public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationParameters that = (OperationParameters) o; + return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route); + } + + @Override + public int hashCode() { + return Objects.hash(create, condition, timeout, route, tracelevel); + } + + @Override + public String toString() { + return "OperationParameters{" + + "create=" + create + + ", condition='" + condition + '\'' + + ", timeout=" + timeout + + ", route='" + route + '\'' + + ", tracelevel=" + tracelevel + + '}'; + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java new file mode 100644 index 00000000000..25f64e3c98a --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -0,0 +1,67 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class JsonStreamFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 10; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(Collectors.joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8)); + Set<String> ids = new ConcurrentSkipListSet<>(); + JsonStreamFeeder.builder(new FeedClient() { + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + ids.add(documentId.userSpecific()); + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + return new CompletableFuture<>(); + } + + @Override + public void close() throws IOException { + + } + }).build().feed(in, 1 << 7); // TODO: hangs on 1 << 6. + assertEquals(docs + 1, ids.size()); + } + +} |