aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api
diff options
context:
space:
mode:
authorMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
committerMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
commit5e956429169d3a733114e5f76f051167f291c786 (patch)
treefa2b9cc664c8c639482397e9a4566149dac3ae29 /vespa-feed-client-api
parentae09069f544a086af4ae02a092ec66788a3cae9e (diff)
Extract vespa-feed-client-api module from vespa-feed-client
Diffstat (limited to 'vespa-feed-client-api')
-rw-r--r--vespa-feed-client-api/pom.xml57
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java112
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java110
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java128
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java47
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java514
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java97
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java17
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java139
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java23
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java29
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java17
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java9
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java240
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java90
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java115
-rw-r--r--vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java34
18 files changed, 1794 insertions, 0 deletions
diff --git a/vespa-feed-client-api/pom.xml b/vespa-feed-client-api/pom.xml
new file mode 100644
index 00000000000..df5fd531f06
--- /dev/null
+++ b/vespa-feed-client-api/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0"?>
+<!-- Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>7-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>vespa-feed-client-api</artifactId>
+ <packaging>jar</packaging>
+ <version>7-SNAPSHOT</version>
+
+ <dependencies>
+ <!-- compile scope -->
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>annotations</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- test scope -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <release>${vespaClients.jdk.releaseVersion}</release>
+ <showDeprecation>true</showDeprecation>
+ <compilerArgs>
+ <arg>-Xlint:all</arg>
+ <arg>-Xlint:-serial</arg>
+ <arg>-Werror</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>abi-check-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java
new file mode 100644
index 00000000000..5474bcfda01
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java
@@ -0,0 +1,112 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Represents a Vespa document id
+ *
+ * @author jonmv
+ */
+public class DocumentId {
+
+ private final String documentType;
+ private final String namespace;
+ private final OptionalLong number;
+ private final Optional<String> group;
+ private final String userSpecific;
+
+ private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) {
+ this.documentType = requireNonNull(documentType);
+ this.namespace = requireNonNull(namespace);
+ this.number = requireNonNull(number);
+ this.group = requireNonNull(group);
+ this.userSpecific = requireNonNull(userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, long number, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String group, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific);
+ }
+
+ public static DocumentId of(String serialized) {
+ DocumentId parsed = parse(serialized);
+ if (parsed != null) return parsed;
+ throw new IllegalArgumentException("Document ID must be on the form " +
+ "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " +
+ "but was '" + serialized + "'");
+ }
+
+ private static DocumentId parse(String serialized) {
+ int i, j = -1;
+ if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
+ if ( ! "id".equals(serialized.substring(i, j))) return null;
+ if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
+ String namespace = serialized.substring(i, j);
+ if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
+ String documentType = serialized.substring(i, j);
+ if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
+ String group = serialized.substring(i, j);
+ if (serialized.length() <= (i = j + 1)) return null;
+ String userSpecific = serialized.substring(i);
+ if (group.startsWith("n=") && group.length() > 2)
+ return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific);
+ if (group.startsWith("g=") && group.length() > 2)
+ return DocumentId.of(namespace, documentType, group.substring(2), userSpecific);
+ if (group.isEmpty())
+ return DocumentId.of(namespace, documentType, userSpecific);
+ return null;
+ }
+
+ public String documentType() {
+ return documentType;
+ }
+
+ public String namespace() {
+ return namespace;
+ }
+
+ public OptionalLong number() {
+ return number;
+ }
+
+ public Optional<String> group() {
+ return group;
+ }
+
+ public String userSpecific() {
+ return userSpecific;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DocumentId that = (DocumentId) o;
+ return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(documentType, namespace, number, group, userSpecific);
+ }
+
+ @Override
+ public String toString() {
+ return "id:" + namespace + ":" + documentType + ":" +
+ (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) +
+ ":" + userSpecific;
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
new file mode 100644
index 00000000000..d463c611d6a
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -0,0 +1,110 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronous feed client accepting document operations as JSON. The payload should be
+ * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre>
+ * {
+ * "fields": {
+ * ...
+ * }
+ * }
+ * </pre>
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface FeedClient extends Closeable {
+
+ /**
+ * Send a document put with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
+
+ /**
+ * Send a document update with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
+
+ /**
+ * Send a document remove with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+
+ /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
+ OperationStats stats();
+
+ /** Current state of the circuit breaker. */
+ CircuitBreaker.State circuitBreakerState();
+
+ /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
+ void close(boolean graceful);
+
+ /** Initiates graceful shutdown. See {@link #close(boolean)}. */
+ default void close() { close(true); }
+
+ /** Controls what to retry, and how many times. */
+ interface RetryStrategy {
+
+ /** Whether to retry operations of the given type. */
+ default boolean retry(OperationType type) { return true; }
+
+ /** Number of retries per operation for assumed transient, non-backpressure problems. */
+ default int retries() { return 10; }
+
+ }
+
+ /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
+ interface CircuitBreaker {
+
+ /** A circuit breaker which is always closed. */
+ CircuitBreaker FUSED = () -> State.CLOSED;
+
+ /** Called by the client whenever a successful response is obtained. */
+ default void success() { }
+
+ /** Called by the client whenever an error HTTP response is received. */
+ default void failure(HttpResponse response) { }
+
+ /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */
+ default void failure(Throwable cause) { }
+
+ /** The current state of the circuit breaker. */
+ State state();
+
+ enum State {
+
+ /** Circuit is closed: business as usual. */
+ CLOSED,
+
+ /** Circuit is half-open: something is wrong, perhaps it recovers? */
+ HALF_OPEN,
+
+ /** Circuit is open: we have given up. */
+ OPEN;
+
+ }
+
+ }
+
+ enum OperationType {
+
+ /** A document put operation. This is idempotent. */
+ PUT,
+
+ /** A document update operation. This is idempotent if all its contained updates are. */
+ UPDATE,
+
+ /** A document remove operation. This is idempotent. */
+ REMOVE;
+
+ }
+
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
new file mode 100644
index 00000000000..daf3f62dac1
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -0,0 +1,128 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.function.Supplier;
+
+/**
+ * Builder for creating a {@link FeedClient} instance.
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface FeedClientBuilder {
+
+ /** Creates a builder for a single container endpoint **/
+ static FeedClientBuilder create(URI endpoint) { return create(Collections.singletonList(endpoint)); }
+
+ /** Creates a builder for multiple container endpoints **/
+ static FeedClientBuilder create(List<URI> endpoints) {
+ Iterator<FeedClientBuilder> iterator = ServiceLoader.load(FeedClientBuilder.class).iterator();
+ if (iterator.hasNext()) {
+ return iterator.next().setEndpointUris(endpoints);
+ } else {
+ try {
+ Class<?> aClass = Class.forName("ai.vespa.feed.client.impl.FeedClientBuilderImpl");
+ for (Constructor<?> constructor : aClass.getConstructors()) {
+ if (constructor.getParameterTypes().length==0) {
+ return ((FeedClientBuilder)constructor.newInstance()).setEndpointUris(endpoints);
+ }
+ }
+ throw new RuntimeException("Could not find Feed client builder implementation");
+ } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Sets the number of connections this client will use per endpoint.
+ *
+ * A reasonable value here is a value that lets all feed clients (if more than one)
+ * collectively have a number of connections which is a small multiple of the numbers
+ * of containers in the cluster to feed, so load can be balanced across these containers.
+ * In general, this value should be kept as low as possible, but poor connectivity
+ * between feeder and cluster may also warrant a higher number of connections.
+ */
+ FeedClientBuilder setConnectionsPerEndpoint(int max);
+
+ /**
+ * Sets the maximum number of streams per HTTP/2 connection for this client.
+ *
+ * This determines the maximum number of concurrent, inflight requests for this client,
+ * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over
+ * more connections, when possible.
+ * The feed client automatically throttles load to achieve the best throughput, and the
+ * actual number of streams per connection is usually lower than the maximum.
+ */
+ FeedClientBuilder setMaxStreamPerConnection(int max);
+
+ /** Sets {@link SSLContext} instance. */
+ FeedClientBuilder setSslContext(SSLContext context);
+
+ /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */
+ FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier);
+
+ /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */
+ FeedClientBuilder noBenchmarking();
+
+ /** Adds HTTP request header to all client requests. */
+ FeedClientBuilder addRequestHeader(String name, String value);
+
+ /**
+ * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request,
+ * i.e. value can be dynamically updated during a feed.
+ */
+ FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier);
+
+ /**
+ * Overrides default retry strategy.
+ * @see FeedClient.RetryStrategy
+ */
+ FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy);
+
+ /**
+ * Overrides default circuit breaker.
+ * @see FeedClient.CircuitBreaker
+ */
+ FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker);
+
+ /** Sets path to client SSL certificate/key PEM files */
+ FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile);
+
+ /** Sets client SSL certificates/key */
+ FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey);
+
+ /** Sets client SSL certificate/key */
+ FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey);
+
+ FeedClientBuilder setDryrun(boolean enabled);
+
+ /**
+ * Overrides JVM default SSL truststore
+ * @param caCertificatesFile Path to PEM encoded file containing trusted certificates
+ */
+ FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile);
+
+ /** Overrides JVM default SSL truststore */
+ FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates);
+
+ /** Overrides endpoint URIs for this client */
+ FeedClientBuilder setEndpointUris(List<URI> endpoints);
+
+ /** Constructs instance of {@link FeedClient} from builder configuration */
+ FeedClient build();
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
new file mode 100644
index 00000000000..1936eb09418
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
@@ -0,0 +1,47 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Optional;
+
+/**
+ * Signals that an error occurred during feeding
+ *
+ * @author bjorncs
+ */
+public class FeedException extends RuntimeException {
+
+ private final DocumentId documentId;
+
+ public FeedException(String message) {
+ super(message);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, String message) {
+ super(message);
+ this.documentId = documentId;
+ }
+
+ public FeedException(String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = null;
+ }
+
+ public FeedException(Throwable cause) {
+ super(cause);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, Throwable cause) {
+ super(cause);
+ this.documentId = documentId;
+ }
+
+ public FeedException(DocumentId documentId, String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = documentId;
+ }
+
+ public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java
new file mode 100644
index 00000000000..62850fef32d
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java
@@ -0,0 +1,16 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+public interface HttpResponse {
+
+ int code();
+ byte[] body();
+
+ static HttpResponse of(int code, byte[] body) {
+ return new HttpResponse() {
+ @Override public int code() { return code; }
+ @Override public byte[] body() { return body; }
+ };
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
new file mode 100644
index 00000000000..41b432449df
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -0,0 +1,514 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.OperationType;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonLocation;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
+import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
+import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
+import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
+import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ * @author bjorncs
+ */
+public class JsonFeeder implements Closeable {
+
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "json-feeder-result-executor");
+ t.setDaemon(true);
+ return t;
+ });
+ private final FeedClient client;
+ private final OperationParameters protoParameters;
+
+ private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
+ this.client = client;
+ this.protoParameters = protoParameters;
+ }
+
+ public interface ResultCallback {
+ /**
+ * Invoked after each operation has either completed successfully or failed
+ *
+ * @param result Non-null if operation completed successfully
+ * @param error Non-null if operation failed
+ */
+ default void onNextResult(Result result, FeedException error) { }
+
+ /**
+ * Invoked if an unrecoverable error occurred during feed processing,
+ * after which no other {@link ResultCallback} methods are invoked.
+ */
+ default void onError(FeedException error) { }
+
+ /**
+ * Invoked when all feed operations are either completed successfully or failed.
+ */
+ default void onComplete() { }
+ }
+
+ public static Builder builder(FeedClient client) { return new Builder(client); }
+
+ /** Feeds single JSON feed operations on the form
+ * <pre>
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * }
+ * </pre>
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Result> feedSingle(String json) {
+ CompletableFuture<Result> result = new CompletableFuture<>();
+ try {
+ SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
+ parser.next().whenCompleteAsync((operationResult, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ } else {
+ result.complete(operationResult);
+ }
+ }, resultExecutor);
+ } catch (Exception e) {
+ resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
+ }
+ return result;
+ }
+
+ /** Feeds a stream containing a JSON array of feed operations on the form
+ * <pre>
+ * [
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "put": "id:ns:type::foo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "update": "id:ns:type:n=4:bar",
+ * "create": true,
+ * "fields": { ... partial update fields ... }
+ * },
+ * {
+ * "remove": "id:ns:type:g=foo:bar",
+ * "condition": "type.baz = \"bax\""
+ * },
+ * ...
+ * ]
+ * </pre>
+ * Note that {@code "id"} is an alias for the document put operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
+ return feedMany(jsonStream, 1 << 26, resultCallback);
+ }
+
+ /**
+ * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance.
+ * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream) {
+ return feedMany(jsonStream, new ResultCallback() { });
+ }
+
+ CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
+ CompletableFuture<Void> overallResult = new CompletableFuture<>();
+ CompletableFuture<Result> result;
+ AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
+ try {
+ RingBufferStream buffer = new RingBufferStream(jsonStream, size);
+ while ((result = buffer.next()) != null) {
+ pending.incrementAndGet();
+ result.whenCompleteAsync((r, t) -> {
+ if (!finalCallbackInvoked.get()) {
+ resultCallback.onNextResult(r, (FeedException) t);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ }
+ }, resultExecutor);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ } catch (Exception e) {
+ if (finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ FeedException wrapped = wrapException(e);
+ resultCallback.onError(wrapped);
+ overallResult.completeExceptionally(wrapped);
+ });
+ }
+ }
+ return overallResult;
+ }
+
+ private static final JsonFactory factory = new JsonFactory();
+
+ @Override public void close() throws IOException {
+ client.close();
+ resultExecutor.shutdown();
+ try {
+ if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ throw new IOException("Failed to close client in time");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private FeedException wrapException(Exception e) {
+ if (e instanceof FeedException) return (FeedException) e;
+ if (e instanceof IOException) {
+ return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e);
+ }
+ return new FeedException(e);
+ }
+
+ private class RingBufferStream extends InputStream {
+
+ private final byte[] b = new byte[1];
+ private final InputStream in;
+ private final Object lock = new Object();
+ private byte[] data;
+ private int size;
+ private IOException thrown = null;
+ private long tail = 0;
+ private long pos = 0;
+ private long head = 0;
+ private boolean done = false;
+ private final OperationParserAndExecutor parserAndExecutor;
+
+ RingBufferStream(InputStream in, int size) throws IOException {
+ this.in = in;
+ this.data = new byte[size];
+ this.size = size;
+
+ new Thread(this::fill, "feed-reader").start();
+
+ this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
+ }
+
+ @Override
+ public int read() throws IOException {
+ return read(b, 0, 1) == -1 ? -1 : b[0];
+ }
+
+ @Override
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ try {
+ int ready;
+ synchronized (lock) {
+ if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write.
+ expand();
+
+ while ((ready = (int) (head - pos)) == 0 && ! done)
+ lock.wait();
+ }
+ if (thrown != null) throw thrown;
+ if (ready == 0) return -1;
+
+ ready = min(ready, len);
+ int offset = (int) (pos % size);
+ int length = min(ready, size - offset);
+ System.arraycopy(data, offset, buffer, off, length);
+ if (length < ready)
+ System.arraycopy(data, 0, buffer, off + length, ready - length);
+
+ pos += ready;
+ return ready;
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
+ }
+ }
+
+ public CompletableFuture<Result> next() throws IOException {
+ return parserAndExecutor.next();
+ }
+
+ private void expand() {
+ int newSize = size * 2;
+ if (newSize <= size)
+ throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much");
+
+ byte[] newData = new byte[newSize];
+ int offset = (int) (tail % size);
+ int newOffset = (int) (tail % newSize);
+ int toWrite = size - offset;
+ System.arraycopy(data, offset, newData, newOffset, toWrite);
+ if (toWrite < size)
+ System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite);
+ size = newSize;
+ data = newData;
+ lock.notify();
+ }
+
+ private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
+ private byte[] copy(long start, long end) {
+ int length = (int) (end - start);
+ byte[] buffer = new byte[prefix.length + length + 1];
+ System.arraycopy(prefix, 0, buffer, 0, prefix.length);
+
+ int offset = (int) (start % size);
+ int toWrite = min(length, size - offset);
+ System.arraycopy(data, offset, buffer, prefix.length, toWrite);
+ if (toWrite < length)
+ System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
+
+ buffer[buffer.length - 1] = '}';
+ return buffer;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
+ in.close();
+ }
+
+ private void fill() {
+ try {
+ while (true) {
+ int free;
+ synchronized (lock) {
+ while ((free = (int) (tail + size - head)) <= 0 && ! done)
+ lock.wait();
+ }
+ if (done) break;
+
+ int off = (int) (head % size);
+ int len = min(min(free, size - off), 1 << 13);
+ int read = in.read(data, off, len);
+
+ synchronized (lock) {
+ if (read < 0) done = true;
+ else head += read;
+ lock.notify();
+ }
+ }
+ } catch (InterruptedException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage());
+ }
+ } catch (IOException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = e;
+ }
+ }
+ }
+
+ private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ String payload = new String(copy(start, end), UTF_8);
+ synchronized (lock) {
+ tail = end;
+ lock.notify();
+ }
+ return payload;
+ }
+ }
+ }
+
+ private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ private final byte[] json;
+
+ SingleOperationParserAndExecutor(byte[] json) throws IOException {
+ super(factory.createParser(json), false);
+ this.json = json;
+ }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}";
+ }
+ }
+
+ private abstract class OperationParserAndExecutor {
+
+ private final JsonParser parser;
+ private final boolean multipleOperations;
+ private boolean arrayPrefixParsed;
+
+ protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) {
+ this.parser = parser;
+ this.multipleOperations = multipleOperations;
+ }
+
+ abstract String getDocumentJson(long start, long end);
+
+ OperationParseException parseException(String error) {
+ JsonLocation location = parser.getTokenLocation();
+ return new OperationParseException(error + " at offset " + location.getByteOffset() +
+ " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")");
+ }
+
+ CompletableFuture<Result> next() throws IOException {
+ JsonToken token = parser.nextToken();
+ if (multipleOperations && ! arrayPrefixParsed && token == JsonToken.START_ARRAY) {
+ arrayPrefixParsed = true;
+ token = parser.nextToken();
+ }
+ if (token == JsonToken.END_ARRAY && multipleOperations) return null;
+ else if (token == null && ! arrayPrefixParsed) return null;
+ else if (token != JsonToken.START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'");
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
+ }
+ default: throw parseException("Unexpected field name '" + parser.getText() + "'");
+ }
+ break;
+
+ case END_OBJECT:
+ break loop;
+
+ default:
+ throw parseException("Unexpected token '" + parser.currentToken() + "'");
+ }
+ }
+ if (id == null)
+ throw parseException("No document id for document");
+ if (type == REMOVE) {
+ if (end >= start)
+ throw parseException("Illegal 'fields' object for remove operation");
+ else
+ start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head.
+ }
+ else if (end < start)
+ throw parseException("No 'fields' object for document");
+
+ String payload = getDocumentJson(start, end);
+ switch (type) {
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
+ default: throw new OperationParseException("Unexpected operation type '" + type + "'");
+ }
+ }
+
+ private void expect(JsonToken token) throws IOException {
+ if (parser.nextToken() != token)
+ throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ }
+
+ private String readString() throws IOException {
+ String value = parser.nextTextValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + JsonToken.VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+ }
+
+ private boolean readBoolean() throws IOException {
+ Boolean value = parser.nextBooleanValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + JsonToken.VALUE_FALSE + "' or '" + JsonToken.VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+
+ }
+
+ private DocumentId readId() throws IOException {
+ return DocumentId.of(readString());
+ }
+
+ }
+
+ public static class Builder {
+
+ final FeedClient client;
+ OperationParameters parameters = OperationParameters.empty();
+
+ private Builder(FeedClient client) {
+ this.client = requireNonNull(client);
+ }
+
+ public Builder withTimeout(Duration timeout) {
+ parameters = parameters.timeout(timeout);
+ return this;
+ }
+
+ public Builder withRoute(String route) {
+ parameters = parameters.route(route);
+ return this;
+ }
+
+ public Builder withTracelevel(int tracelevel) {
+ parameters = parameters.tracelevel(tracelevel);
+ return this;
+ }
+
+ public JsonFeeder build() {
+ return new JsonFeeder(client, parameters);
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java
new file mode 100644
index 00000000000..0ec40e114df
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java
@@ -0,0 +1,97 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Per-operation feed parameters
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public class OperationParameters {
+
+ static final OperationParameters empty = new OperationParameters(false, null, null, null, 0);
+
+ private final boolean create;
+ private final String condition;
+ private final Duration timeout;
+ private final String route;
+ private final int tracelevel;
+
+ private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) {
+ this.create = create;
+ this.condition = condition;
+ this.timeout = timeout;
+ this.route = route;
+ this.tracelevel = tracelevel;
+ }
+
+ public static OperationParameters empty() { return empty; }
+
+ public OperationParameters createIfNonExistent(boolean create) {
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters testAndSetCondition(String condition) {
+ if (condition.isEmpty())
+ throw new IllegalArgumentException("TestAndSetCondition must be non-empty");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters timeout(Duration timeout) {
+ if (timeout.isNegative() || timeout.isZero())
+ throw new IllegalArgumentException("Timeout must be positive, but was " + timeout);
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters route(String route) {
+ if (route.isEmpty())
+ throw new IllegalArgumentException("Route must be non-empty");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters tracelevel(int tracelevel) {
+ if (tracelevel < 1 || tracelevel > 9)
+ throw new IllegalArgumentException("Tracelevel must be in [1, 9]");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public boolean createIfNonExistent() { return create; }
+ public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); }
+ public Optional<Duration> timeout() { return Optional.ofNullable(timeout); }
+ public Optional<String> route() { return Optional.ofNullable(route); }
+ public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OperationParameters that = (OperationParameters) o;
+ return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(create, condition, timeout, route, tracelevel);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationParameters{" +
+ "create=" + create +
+ ", condition='" + condition + '\'' +
+ ", timeout=" + timeout +
+ ", route='" + route + '\'' +
+ ", tracelevel=" + tracelevel +
+ '}';
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java
new file mode 100644
index 00000000000..4404462be2e
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedException;
+
+/**
+ * Signals that supplied JSON for a document/operation is invalid
+ *
+ * @author bjorncs
+ */
+public class OperationParseException extends FeedException {
+
+ public OperationParseException(String message) { super(message); }
+
+ public OperationParseException(String message, Throwable cause) { super(message, cause); }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java
new file mode 100644
index 00000000000..ab2faf245d8
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java
@@ -0,0 +1,139 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Statistics for feed operations over HTTP against a Vespa cluster.
+ *
+ * @author jonmv
+ */
+public class OperationStats {
+
+ private final long requests;
+ private final Map<Integer, Long> responsesByCode;
+ private final long inflight;
+ private final long exceptions;
+ private final long averageLatencyMillis;
+ private final long minLatencyMillis;
+ private final long maxLatencyMillis;
+ private final long bytesSent;
+ private final long bytesReceived;
+
+ public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight,
+ long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis,
+ long bytesSent, long bytesReceived) {
+ this.requests = requests;
+ this.responsesByCode = responsesByCode;
+ this.exceptions = exceptions;
+ this.inflight = inflight;
+ this.averageLatencyMillis = averageLatencyMillis;
+ this.minLatencyMillis = minLatencyMillis;
+ this.maxLatencyMillis = maxLatencyMillis;
+ this.bytesSent = bytesSent;
+ this.bytesReceived = bytesReceived;
+ }
+
+ /** Returns the difference between this and the initial. Min and max latency are not modified. */
+ public OperationStats since(OperationStats initial) {
+ return new OperationStats(requests - initial.requests,
+ responsesByCode.entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(),
+ entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))),
+ exceptions - initial.exceptions,
+ inflight - initial.inflight,
+ responsesByCode.size() == initial.responsesByCode.size() ? 0 :
+ (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size())
+ / (responsesByCode.size() - initial.responsesByCode.size()),
+ minLatencyMillis,
+ maxLatencyMillis,
+ bytesSent - initial.bytesSent,
+ bytesReceived - initial.bytesReceived);
+ }
+
+ /** Number of HTTP requests attempted. */
+ public long requests() {
+ return requests;
+ }
+
+ /** Number of HTTP responses received. */
+ public long responses() {
+ return requests - inflight - exceptions;
+ }
+
+ /** Number of 200 OK HTTP responses received. */
+ public long successes() {
+ return responsesByCode.getOrDefault(200, 0L);
+ }
+
+ /** Number of HTTP responses by status code. */
+ public Map<Integer, Long> responsesByCode() {
+ return responsesByCode;
+ }
+
+ /** Number of exceptions (instead of responses). */
+ public long exceptions() {
+ return exceptions;
+ }
+
+ /** Number of attempted requests which haven't yielded a response or exception yet. */
+ public long inflight() {
+ return inflight;
+ }
+
+ /** Average request-response latency, or -1. */
+ public long averageLatencyMillis() {
+ return averageLatencyMillis;
+ }
+
+ /** Minimum request-response latency, or -1. */
+ public long minLatencyMillis() {
+ return minLatencyMillis;
+ }
+
+ /** Maximum request-response latency, or -1. */
+ public long maxLatencyMillis() {
+ return maxLatencyMillis;
+ }
+
+ /** Number of bytes sent, for HTTP requests with a response. */
+ public long bytesSent() {
+ return bytesSent;
+ }
+
+ /** Number of bytes received in HTTP responses. */
+ public long bytesReceived() {
+ return bytesReceived;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OperationStats that = (OperationStats) o;
+ return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived);
+ }
+
+ @Override
+ public String toString() {
+ return "Stats{" +
+ "requests=" + requests +
+ ", responsesByCode=" + responsesByCode +
+ ", exceptions=" + exceptions +
+ ", inflight=" + inflight +
+ ", averageLatencyMillis=" + averageLatencyMillis +
+ ", minLatencyMillis=" + minLatencyMillis +
+ ", maxLatencyMillis=" + maxLatencyMillis +
+ ", bytesSent=" + bytesSent +
+ ", bytesReceived=" + bytesReceived +
+ '}';
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java
new file mode 100644
index 00000000000..fa114f6a183
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Optional;
+
+/**
+ * Result for a document operation which completed normally.
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface Result {
+
+ enum Type {
+ success,
+ conditionNotMet
+ }
+
+ Type type();
+ DocumentId documentId();
+ Optional<String> resultMessage();
+ Optional<String> traceMessage();
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java
new file mode 100644
index 00000000000..27803898c01
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.OperationParameters;
+
+import java.util.Optional;
+
+/**
+ * Signals that the document API in the feed container returned a failure result for a feed operation.
+ *
+ * @author jonmv
+ */
+public class ResultException extends FeedException {
+
+ private final String trace;
+
+ public ResultException(DocumentId documentId, String message, String trace) {
+ super(documentId, message);
+ this.trace = trace;
+ }
+
+ /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */
+ public Optional<String> getTrace() {
+ return Optional.ofNullable(trace);
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java
new file mode 100644
index 00000000000..f149b13196b
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedException;
+
+/**
+ * Signals that the client was unable to obtain a proper response/result from container
+ *
+ * @author bjorncs
+ */
+public class ResultParseException extends FeedException {
+
+ public ResultParseException(DocumentId documentId, String message) { super(documentId, message); }
+
+ public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); }
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java
new file mode 100644
index 00000000000..daab16a9ff2
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java
@@ -0,0 +1,9 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author bjorncs
+ */
+
+@PublicApi
+package ai.vespa.feed.client;
+
+import com.yahoo.api.annotations.PublicApi; \ No newline at end of file
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
new file mode 100644
index 00000000000..d795678db39
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java
@@ -0,0 +1,240 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.joining;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class JsonFeederTest {
+
+ @Test
+ void test() throws IOException {
+ int docs = 1 << 14;
+ String json = "[\n" +
+
+ IntStream.range(0, docs).mapToObj(i ->
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + i + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " },\n"
+ ).collect(joining()) +
+
+ " {\n" +
+ " \"id\": \"id:ns:type::abc" + docs + "\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n" +
+ "]";
+ AtomicReference<FeedException> exceptionThrow = new AtomicReference<>();
+ Path tmpFile = Files.createTempFile(null, null);
+ Files.write(tmpFile, json.getBytes(UTF_8));
+ try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) {
+ AtomicInteger resultsReceived = new AtomicInteger();
+ AtomicBoolean completedSuccessfully = new AtomicBoolean();
+ long startNanos = System.nanoTime();
+ MockClient feedClient = new MockClient();
+ JsonFeeder.builder(feedClient).build()
+ .feedMany(in, 1 << 10,
+ new JsonFeeder.ResultCallback() {
+ @Override
+ public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); }
+
+ @Override
+ public void onError(FeedException error) { exceptionThrow.set(error); }
+
+ @Override
+ public void onComplete() { completedSuccessfully.set(true); }
+ })
+ .join();
+
+ System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ assertEquals(docs + 1, feedClient.putOperations.size());
+ assertEquals(docs + 1, resultsReceived.get());
+ assertTrue(completedSuccessfully.get());
+ assertNull(exceptionThrow.get());
+ }
+ }
+
+ @Test
+ public void multipleJsonArrayOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "[{" +
+ " \"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ "},\n" +
+ "{" +
+ " \"put\": \"id:ns:type::abc2\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ "}]\n";
+ feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8))).get();
+ client.assertPutDocumentIds("abc1", "abc2");
+ client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ client.assertPutOperation("abc2", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ }
+ }
+
+ @Test
+ public void multipleJsonLOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "{\n" +
+ " \"remove\": \"id:ns:type::abc1\"\n" +
+ "}\n" +
+ "{\n" +
+ " \"fields\": {\n" +
+ " \"lul\": { \"assign\": \"lal\" }\n" +
+ " },\n" +
+ " \"update\": \"id:ns:type::abc2\"\n" +
+ "}\n" +
+ "{\n" +
+ " \"put\": \"id:ns:type::abc3\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\": \"lal\"\n" +
+ " }\n" +
+ "}\n";
+
+ feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)),
+ 3, // Mini-buffer, which needs to expand.
+ new JsonFeeder.ResultCallback() { })
+ .get();
+ client.assertRemoveDocumentIds("abc1");
+ client.assertUpdateDocumentIds("abc2");
+ client.assertUpdateOperation("abc2", "{\"fields\":{\n \"lul\": { \"assign\": \"lal\" }\n }}");
+ client.assertPutDocumentIds("abc3");
+ client.assertPutOperation("abc3", "{\"fields\":{\n \"lul\": \"lal\"\n }}");
+ }
+ }
+
+ @Test
+ public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException {
+ MockClient client = new MockClient();
+ try (JsonFeeder feeder = JsonFeeder.builder(client).build()) {
+ String json = "{\"put\": \"id:ns:type::abc1\",\n" +
+ " \"fields\": {\n" +
+ " \"lul\":\"lal\"\n" +
+ " }\n" +
+ " }\n";
+ Result result = feeder.feedSingle(json).get();
+ Assertions.assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId());
+ assertEquals(Result.Type.success, result.type());
+ assertEquals("success", result.resultMessage().get());
+ client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}");
+ }
+ }
+
+ private static class MockClient implements FeedClient {
+ final Map<DocumentId, String> putOperations = new LinkedHashMap<>();
+ final Map<DocumentId, String> updateOperations = new LinkedHashMap<>();
+ final Map<DocumentId, String> removeOperations = new LinkedHashMap<>();
+
+ @Override
+ public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) {
+ putOperations.put(documentId, documentJson);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) {
+ updateOperations.put(documentId, updateJson);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) {
+ removeOperations.put(documentId, null);
+ return createSuccessResult(documentId);
+ }
+
+ @Override
+ public OperationStats stats() { return null; }
+
+ @Override
+ public CircuitBreaker.State circuitBreakerState() { return null; }
+
+ @Override
+ public void close(boolean graceful) { }
+
+ private CompletableFuture<Result> createSuccessResult(DocumentId documentId) {
+ return CompletableFuture.completedFuture(new Result(){
+ @Override public Type type() { return Type.success; }
+ @Override public DocumentId documentId() { return documentId; }
+ @Override public Optional<String> resultMessage() { return Optional.of("success"); }
+ @Override public Optional<String> traceMessage() { return Optional.empty(); }
+ });
+ }
+
+ void assertDocumentIds(Collection<DocumentId> keys, String... expectedUserSpecificIds) {
+ List<String> expected = Arrays.stream(expectedUserSpecificIds)
+ .map(userSpecific -> "id:ns:type::" + userSpecific)
+ .sorted()
+ .collect(Collectors.toList());
+ List<String> actual = keys.stream()
+ .map(DocumentId::toString).sorted()
+ .collect(Collectors.toList());
+ assertEquals(expected, actual, "Document ids must match");
+ }
+
+ void assertPutDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(putOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertUpdateDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(updateOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertRemoveDocumentIds(String... expectedUserSpecificIds) {
+ assertDocumentIds(removeOperations.keySet(), expectedUserSpecificIds);
+ }
+
+ void assertPutOperation(String userSpecificId, String expectedJson) {
+ DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId);
+ String json = putOperations.get(docId);
+ assertNotNull(json);
+ assertEquals(expectedJson.trim(), json.trim());
+ }
+
+ void assertUpdateOperation(String userSpecificId, String expectedJson) {
+ DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId);
+ String json = updateOperations.get(docId);
+ assertNotNull(json);
+ assertEquals(expectedJson.trim(), json.trim());
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
new file mode 100644
index 00000000000..b951fb62fb5
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java
@@ -0,0 +1,90 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.JsonFeeder;
+import ai.vespa.feed.client.Result;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Logger;
+
+/**
+ * Sample feeder demonstrating how to programmatically feed to a Vespa cluster.
+ */
+class JsonFileFeederExample implements Closeable {
+
+ private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName());
+
+ private final JsonFeeder jsonFeeder;
+ private final URI endpoint;
+
+ static class ResultCallBack implements JsonFeeder.ResultCallback {
+
+ final AtomicInteger resultsReceived = new AtomicInteger(0);
+ final AtomicInteger errorsReceived = new AtomicInteger(0);
+ final long startTimeMillis = System.currentTimeMillis();;
+
+ @Override
+ public void onNextResult(Result result, FeedException error) {
+ resultsReceived.incrementAndGet();
+ if (error != null) {
+ log.warning("Problems with feeding document "
+ + error.documentId().map(DocumentId::toString).orElse("<unknown>")
+ + ": " + error);
+ errorsReceived.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void onError(FeedException error) {
+ log.severe("Feeding failed fatally: " + error.getMessage());
+ }
+
+ @Override
+ public void onComplete() {
+ log.info("Feeding completed");
+ }
+
+ void dumpStatsToLog() {
+ log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors.");
+ log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms.");
+ }
+
+ }
+
+ JsonFileFeederExample(URI endpoint) {
+ this.endpoint = endpoint;
+ FeedClient feedClient = FeedClientBuilder.create(endpoint)
+ .build();
+ this.jsonFeeder = JsonFeeder.builder(feedClient)
+ .withTimeout(Duration.ofSeconds(30))
+ .build();
+ }
+
+ /**
+ * Feed all operations from a stream.
+ *
+ * @param stream The input stream to read operations from (JSON array containing one or more document operations).
+ */
+ void batchFeed(InputStream stream, String batchId) {
+ ResultCallBack callback = new ResultCallBack();
+ log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'");
+ CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback);
+ promise.join(); // wait for feeding to complete
+ callback.dumpStatsToLog();
+ }
+
+ @Override
+ public void close() throws IOException {
+ jsonFeeder.close();
+ }
+}
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
new file mode 100644
index 00000000000..3d4ce150fcf
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java
@@ -0,0 +1,115 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Simple Streaming feeder implementation which will send operations to a Vespa endpoint.
+ * Other threads communicate with the feeder by adding new operations on the BlockingQueue
+ */
+
+class JsonStreamFeederExample extends Thread implements AutoCloseable {
+
+ static class Operation {
+ final String type;
+ final String documentId;
+ final String documentFieldsJson;
+
+ Operation(String type, String id, String fields) {
+ this.type = type;
+ this.documentId = id;
+ this.documentFieldsJson = fields;
+ }
+ }
+
+ private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName());
+
+ private final BlockingQueue<Operation> operations;
+ private final FeedClient feedClient;
+ private final AtomicBoolean drain = new AtomicBoolean(false);
+ private final CountDownLatch finishedDraining = new CountDownLatch(1);
+ private final AtomicInteger resultCounter = new AtomicInteger();
+
+ /**
+ * Constructor
+ * @param operations The shared blocking queue where other threads can put document operations to.
+ * @param endpoint The endpoint to feed to
+ */
+ JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) {
+ this.operations = operations;
+ this.feedClient = FeedClientBuilder.create(endpoint).build();
+ }
+
+ /**
+ * Shutdown this feeder, waits until operations on queue is drained
+ */
+ @Override
+ public void close() {
+ log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size());
+ drain.set(true);
+ try {
+ finishedDraining.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!drain.get() || !operations.isEmpty()) {
+ try {
+ JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS);
+ if(op == null) // no operations available
+ continue;
+ log.info("Put document " + op.documentId);
+ CompletableFuture<Result> promise;
+ DocumentId docId = DocumentId.of(op.documentId);
+ OperationParameters params = OperationParameters.empty();
+ String json = op.documentFieldsJson;
+ switch (op.type) {
+ case "put":
+ promise = feedClient.put(docId, json, params);
+ break;
+ case "remove":
+ promise = feedClient.remove(docId, params);
+ break;
+ case "update":
+ promise = feedClient.update(docId, json, params);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid operation: " + op.type);
+ }
+ promise.whenComplete((result, throwable) -> {
+ if (resultCounter.getAndIncrement() % 10 == 0) {
+ System.err.println(feedClient.stats());
+ }
+ if (throwable != null) {
+ System.err.printf("Failure for '%s': %s", docId, throwable);
+ throwable.printStackTrace();
+ }
+ });
+ } catch (InterruptedException e) {
+ log.log(Level.SEVERE, "Got interrupt exception.", e);
+ break;
+ }
+ }
+ log.info("Shutting down feeding thread");
+ this.feedClient.close();
+ finishedDraining.countDown();
+ }
+
+} \ No newline at end of file
diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
new file mode 100644
index 00000000000..4e6473a6568
--- /dev/null
+++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java
@@ -0,0 +1,34 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client.examples;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.Result;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+class SimpleExample {
+
+ public static void main(String[] args) {
+ try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) {
+ DocumentId id = DocumentId.of("namespace", "documenttype", "1");
+ String json = "{\"fields\": {\"title\": \"hello world\"}}";
+ OperationParameters params = OperationParameters.empty()
+ .timeout(Duration.ofSeconds(5))
+ .route("myvesparoute");
+ CompletableFuture<Result> promise = client.put(id, json, params);
+ promise.whenComplete(((result, throwable) -> {
+ if (throwable != null) {
+ throwable.printStackTrace();
+ } else {
+ System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage());
+ }
+ }));
+ }
+ }
+
+}