aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-05-27 00:46:23 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-05-27 09:54:43 +0200
commitfce5c8dd2e6ddf3c2364f4308c1aff2ec2d3aa4a (patch)
treef3d5cd7a61075a79edca2a753b20b204ba748058 /vespa-feed-client
parent6b6e59869ab5259a8cd2e382cd2b5164a963a293 (diff)
WIP JSON parsing (probably works)
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java247
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java25
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java67
3 files changed, 332 insertions, 7 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
index 7ece0d4ef73..74965270fb9 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
@@ -1,11 +1,28 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import ai.vespa.feed.client.FeedClient.OperationType;
+import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import static ai.vespa.feed.client.FeedClient.OperationType.put;
+import static ai.vespa.feed.client.FeedClient.OperationType.remove;
+import static ai.vespa.feed.client.FeedClient.OperationType.update;
+import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
+import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
/**
@@ -48,29 +65,245 @@ public class JsonStreamFeeder {
* </pre>
* Note that {@code "id"} is an alias for the document put operation.
*/
- public void feed(InputStream jsonStream) {
+ public void feed(InputStream jsonStream) throws IOException {
+ feed(jsonStream, 1 << 26);
+ }
+ void feed(InputStream jsonStream, int size) throws IOException {
+ RingBufferStream buffer = new RingBufferStream(jsonStream, size);
+ buffer.expect(JsonToken.START_ARRAY);
+ CompletableFuture<Result> result;
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ while ((result = buffer.next()) != null) {
+ result.whenComplete((r, t) -> {
+ if (t != null)
+ thrown.set(t);
+ else
+ ; // Aggregate stats.
+ });
+ if (thrown.get() != null)
+ sneakyThrow(thrown.get());
+ }
}
+ @SuppressWarnings("unchecked")
+ static <T extends Throwable> void sneakyThrow(Throwable thrown) throws T { throw (T) thrown; }
+
+ private static final JsonFactory factory = new JsonFactory();
- static class Tokenizer {
+ private class RingBufferStream extends InputStream {
+ private final byte[] b = new byte[1];
private final InputStream in;
- private final JsonParser json;
+ private final byte[] data;
+ private final int size;
+ private final Object lock = new Object();
+ private final JsonParser parser;
+ private Throwable thrown = null;
+ private long tail = 0;
+ private long pos = 0;
+ private long head = 0;
+ private boolean done = false;
- public Tokenizer(InputStream in, JsonParser json) {
+ RingBufferStream(InputStream in, int size) {
this.in = in;
- this.json = json;
+ this.data = new byte[size];
+ this.size = size;
+
+ new Thread(this::fill, "feed-reader").start();
+
+ try { this.parser = factory.createParser(this); }
+ catch (IOException e) { throw new UncheckedIOException(e); }
}
- }
+ @Override
+ public int read() throws IOException {
+ return read(b, 0, 1) == -1 ? -1 : b[0];
+ }
+
+ @Override
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ try {
+ int ready;
+ synchronized (lock) {
+ while ((ready = (int) (head - pos)) == 0 && ! done)
+ lock.wait();
+ }
+ if (thrown != null) throw new RuntimeException("Error reading input", thrown);
+ if (ready == 0) return -1;
+
+ ready = min(ready, len);
+ int offset = (int) (pos % size);
+ int length = min(ready, size - offset);
+ System.arraycopy(data, offset, buffer, off, length);
+ if (length < ready)
+ System.arraycopy(data, 0, buffer, off + length, ready - length);
+
+ pos += ready;
+ return ready;
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
+ }
+ }
+
+ void expect(JsonToken token) throws IOException {
+ if (parser.nextToken() != token)
+ throw new IllegalArgumentException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ }
+
+ public CompletableFuture<Result> next() throws IOException {
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ switch (parser.nextToken()) {
+ case END_ARRAY: return null;
+ case START_OBJECT: break;
+ default: throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
+
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = put; id = readId(); break;
+ case "update": type = update; id = readId(); break;
+ case "remove": type = remove; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
+ }
+ default: throw new IllegalArgumentException("Unexpected field name '" + parser.getText() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
+ break;
+
+ case END_OBJECT:
+ break loop;
+
+ default:
+ throw new IllegalArgumentException("Unexpected token '" + parser.currentToken() + "' at offset " +
+ parser.getTokenLocation().getByteOffset());
+ }
+ }
+
+ if (id == null)
+ throw new IllegalArgumentException("No document id for document at offset " + start);
+
+ if (end < start)
+ throw new IllegalArgumentException("No 'fields' object for document at offset " + parser.getTokenLocation().getByteOffset());
+
+ String payload = new String(copy(start, end), UTF_8);
+ synchronized (lock) {
+ tail = end;
+ lock.notify();
+ }
+
+ switch (type) {
+ case put: return client.put (id, payload, parameters);
+ case update: return client.update(id, payload, parameters);
+ case remove: return client.remove(id, parameters);
+ default: throw new IllegalStateException("Unexpected operation type '" + type + "'");
+ }
+ }
+
+ private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
+ private byte[] copy(long start, long end) {
+ int length = (int) (end - start);
+ byte[] buffer = new byte[prefix.length + length + 1];
+ System.arraycopy(prefix, 0, buffer, 0, prefix.length);
+
+ int offset = (int) (start % size);
+ int toWrite = min(length, size - offset);
+ System.arraycopy(data, offset, buffer, prefix.length, toWrite);
+ if (toWrite < length)
+ System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
+
+ buffer[buffer.length - 1] = '}';
+ return buffer;
+ }
+
+ private String readString() throws IOException {
+ String value = parser.nextTextValue();
+ if (value == null)
+ throw new IllegalArgumentException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ return value;
+ }
+
+ private boolean readBoolean() throws IOException {
+ Boolean value = parser.nextBooleanValue();
+ if (value == null)
+ throw new IllegalArgumentException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+
+ }
+
+ private DocumentId readId() throws IOException {
+ return DocumentId.of(readString());
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
+ in.close();
+ }
+
+ private void fill() {
+ try {
+ while (true) {
+ int free;
+ synchronized (lock) {
+ while ((free = (int) (tail + size - head)) <= 0 && ! done)
+ lock.wait();
+ }
+ if (done) break;
+
+ int off = (int) (head % size);
+ int len = min(min(free, size - off), 1 << 13);
+ int read = in.read(data, off, len);
+
+ synchronized (lock) {
+ if (read < 0) done = true;
+ else head += read;
+ lock.notify();
+ }
+ }
+ }
+ catch (Throwable t) {
+ synchronized (lock) {
+ done = true;
+ thrown = t;
+ }
+ }
+ }
+
+ }
public static class Builder {
final FeedClient client;
- OperationParameters parameters;
+ OperationParameters parameters = OperationParameters.empty();
private Builder(FeedClient client) {
this.client = requireNonNull(client);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
index 78450caf204..22546f89ccb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
@@ -2,6 +2,7 @@
package ai.vespa.feed.client;
import java.time.Duration;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
@@ -67,4 +68,28 @@ public class OperationParameters {
public Optional<String> route() { return Optional.ofNullable(route); }
public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); }
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OperationParameters that = (OperationParameters) o;
+ return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(create, condition, timeout, route, tracelevel);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationParameters{" +
+ "create=" + create +
+ ", condition='" + condition + '\'' +
+ ", timeout=" + timeout +
+ ", route='" + route + '\'' +
+ ", tracelevel=" + tracelevel +
+ '}';
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
new file mode 100644
index 00000000000..25f64e3c98a
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
@@ -0,0 +1,67 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class JsonStreamFeederTest {
+
+ @Test
+ void test() throws IOException {
+ int docs = 1 << 10;
+ String json = "[\n" +
+
+ IntStream.range(0, docs).mapToObj(i ->
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + i + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " },\n"
+ ).collect(Collectors.joining()) +
+
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + docs + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n" +
+ "]";
+ ByteArrayInputStream in = new ByteArrayInputStream(json.getBytes(UTF_8));
+ Set<String> ids = new ConcurrentSkipListSet<>();
+ JsonStreamFeeder.builder(new FeedClient() {
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ ids.add(documentId.userSpecific());
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ return new CompletableFuture<>();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ }).build().feed(in, 1 << 7); // TODO: hangs on 1 << 6.
+ assertEquals(docs + 1, ids.size());
+ }
+
+}