aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-api/src/main/java/ai/vespa/feed/client
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client-api/src/main/java/ai/vespa/feed/client')
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java112
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java110
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java128
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java47
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java514
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java97
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java17
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java139
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java23
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java29
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java17
-rw-r--r--vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java9
13 files changed, 1258 insertions, 0 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java
new file mode 100644
index 00000000000..5474bcfda01
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java
@@ -0,0 +1,112 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Represents a Vespa document id
+ *
+ * @author jonmv
+ */
+public class DocumentId {
+
+ private final String documentType;
+ private final String namespace;
+ private final OptionalLong number;
+ private final Optional<String> group;
+ private final String userSpecific;
+
+ private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) {
+ this.documentType = requireNonNull(documentType);
+ this.namespace = requireNonNull(namespace);
+ this.number = requireNonNull(number);
+ this.group = requireNonNull(group);
+ this.userSpecific = requireNonNull(userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, long number, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific);
+ }
+
+ public static DocumentId of(String namespace, String documentType, String group, String userSpecific) {
+ return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific);
+ }
+
+ public static DocumentId of(String serialized) {
+ DocumentId parsed = parse(serialized);
+ if (parsed != null) return parsed;
+ throw new IllegalArgumentException("Document ID must be on the form " +
+ "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " +
+ "but was '" + serialized + "'");
+ }
+
+ private static DocumentId parse(String serialized) {
+ int i, j = -1;
+ if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
+ if ( ! "id".equals(serialized.substring(i, j))) return null;
+ if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
+ String namespace = serialized.substring(i, j);
+ if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
+ String documentType = serialized.substring(i, j);
+ if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
+ String group = serialized.substring(i, j);
+ if (serialized.length() <= (i = j + 1)) return null;
+ String userSpecific = serialized.substring(i);
+ if (group.startsWith("n=") && group.length() > 2)
+ return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific);
+ if (group.startsWith("g=") && group.length() > 2)
+ return DocumentId.of(namespace, documentType, group.substring(2), userSpecific);
+ if (group.isEmpty())
+ return DocumentId.of(namespace, documentType, userSpecific);
+ return null;
+ }
+
+ public String documentType() {
+ return documentType;
+ }
+
+ public String namespace() {
+ return namespace;
+ }
+
+ public OptionalLong number() {
+ return number;
+ }
+
+ public Optional<String> group() {
+ return group;
+ }
+
+ public String userSpecific() {
+ return userSpecific;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DocumentId that = (DocumentId) o;
+ return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(documentType, namespace, number, group, userSpecific);
+ }
+
+ @Override
+ public String toString() {
+ return "id:" + namespace + ":" + documentType + ":" +
+ (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) +
+ ":" + userSpecific;
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
new file mode 100644
index 00000000000..d463c611d6a
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -0,0 +1,110 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Asynchronous feed client accepting document operations as JSON. The payload should be
+ * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre>
+ * {
+ * "fields": {
+ * ...
+ * }
+ * }
+ * </pre>
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface FeedClient extends Closeable {
+
+ /**
+ * Send a document put with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
+
+ /**
+ * Send a document update with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
+
+ /**
+ * Send a document remove with the given parameters, returning a future with the result of the operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+
+ /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
+ OperationStats stats();
+
+ /** Current state of the circuit breaker. */
+ CircuitBreaker.State circuitBreakerState();
+
+ /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
+ void close(boolean graceful);
+
+ /** Initiates graceful shutdown. See {@link #close(boolean)}. */
+ default void close() { close(true); }
+
+ /** Controls what to retry, and how many times. */
+ interface RetryStrategy {
+
+ /** Whether to retry operations of the given type. */
+ default boolean retry(OperationType type) { return true; }
+
+ /** Number of retries per operation for assumed transient, non-backpressure problems. */
+ default int retries() { return 10; }
+
+ }
+
+ /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
+ interface CircuitBreaker {
+
+ /** A circuit breaker which is always closed. */
+ CircuitBreaker FUSED = () -> State.CLOSED;
+
+ /** Called by the client whenever a successful response is obtained. */
+ default void success() { }
+
+ /** Called by the client whenever an error HTTP response is received. */
+ default void failure(HttpResponse response) { }
+
+ /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */
+ default void failure(Throwable cause) { }
+
+ /** The current state of the circuit breaker. */
+ State state();
+
+ enum State {
+
+ /** Circuit is closed: business as usual. */
+ CLOSED,
+
+ /** Circuit is half-open: something is wrong, perhaps it recovers? */
+ HALF_OPEN,
+
+ /** Circuit is open: we have given up. */
+ OPEN;
+
+ }
+
+ }
+
+ enum OperationType {
+
+ /** A document put operation. This is idempotent. */
+ PUT,
+
+ /** A document update operation. This is idempotent if all its contained updates are. */
+ UPDATE,
+
+ /** A document remove operation. This is idempotent. */
+ REMOVE;
+
+ }
+
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
new file mode 100644
index 00000000000..daf3f62dac1
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -0,0 +1,128 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.function.Supplier;
+
+/**
+ * Builder for creating a {@link FeedClient} instance.
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface FeedClientBuilder {
+
+ /** Creates a builder for a single container endpoint **/
+ static FeedClientBuilder create(URI endpoint) { return create(Collections.singletonList(endpoint)); }
+
+ /** Creates a builder for multiple container endpoints **/
+ static FeedClientBuilder create(List<URI> endpoints) {
+ Iterator<FeedClientBuilder> iterator = ServiceLoader.load(FeedClientBuilder.class).iterator();
+ if (iterator.hasNext()) {
+ return iterator.next().setEndpointUris(endpoints);
+ } else {
+ try {
+ Class<?> aClass = Class.forName("ai.vespa.feed.client.impl.FeedClientBuilderImpl");
+ for (Constructor<?> constructor : aClass.getConstructors()) {
+ if (constructor.getParameterTypes().length==0) {
+ return ((FeedClientBuilder)constructor.newInstance()).setEndpointUris(endpoints);
+ }
+ }
+ throw new RuntimeException("Could not find Feed client builder implementation");
+ } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Sets the number of connections this client will use per endpoint.
+ *
+ * A reasonable value here is a value that lets all feed clients (if more than one)
+ * collectively have a number of connections which is a small multiple of the numbers
+ * of containers in the cluster to feed, so load can be balanced across these containers.
+ * In general, this value should be kept as low as possible, but poor connectivity
+ * between feeder and cluster may also warrant a higher number of connections.
+ */
+ FeedClientBuilder setConnectionsPerEndpoint(int max);
+
+ /**
+ * Sets the maximum number of streams per HTTP/2 connection for this client.
+ *
+ * This determines the maximum number of concurrent, inflight requests for this client,
+ * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over
+ * more connections, when possible.
+ * The feed client automatically throttles load to achieve the best throughput, and the
+ * actual number of streams per connection is usually lower than the maximum.
+ */
+ FeedClientBuilder setMaxStreamPerConnection(int max);
+
+ /** Sets {@link SSLContext} instance. */
+ FeedClientBuilder setSslContext(SSLContext context);
+
+ /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */
+ FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier);
+
+ /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */
+ FeedClientBuilder noBenchmarking();
+
+ /** Adds HTTP request header to all client requests. */
+ FeedClientBuilder addRequestHeader(String name, String value);
+
+ /**
+ * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request,
+ * i.e. value can be dynamically updated during a feed.
+ */
+ FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier);
+
+ /**
+ * Overrides default retry strategy.
+ * @see FeedClient.RetryStrategy
+ */
+ FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy);
+
+ /**
+ * Overrides default circuit breaker.
+ * @see FeedClient.CircuitBreaker
+ */
+ FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker);
+
+ /** Sets path to client SSL certificate/key PEM files */
+ FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile);
+
+ /** Sets client SSL certificates/key */
+ FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey);
+
+ /** Sets client SSL certificate/key */
+ FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey);
+
+ FeedClientBuilder setDryrun(boolean enabled);
+
+ /**
+ * Overrides JVM default SSL truststore
+ * @param caCertificatesFile Path to PEM encoded file containing trusted certificates
+ */
+ FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile);
+
+ /** Overrides JVM default SSL truststore */
+ FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates);
+
+ /** Overrides endpoint URIs for this client */
+ FeedClientBuilder setEndpointUris(List<URI> endpoints);
+
+ /** Constructs instance of {@link FeedClient} from builder configuration */
+ FeedClient build();
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
new file mode 100644
index 00000000000..1936eb09418
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java
@@ -0,0 +1,47 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Optional;
+
+/**
+ * Signals that an error occurred during feeding
+ *
+ * @author bjorncs
+ */
+public class FeedException extends RuntimeException {
+
+ private final DocumentId documentId;
+
+ public FeedException(String message) {
+ super(message);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, String message) {
+ super(message);
+ this.documentId = documentId;
+ }
+
+ public FeedException(String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = null;
+ }
+
+ public FeedException(Throwable cause) {
+ super(cause);
+ this.documentId = null;
+ }
+
+ public FeedException(DocumentId documentId, Throwable cause) {
+ super(cause);
+ this.documentId = documentId;
+ }
+
+ public FeedException(DocumentId documentId, String message, Throwable cause) {
+ super(message, cause);
+ this.documentId = documentId;
+ }
+
+ public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java
new file mode 100644
index 00000000000..62850fef32d
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java
@@ -0,0 +1,16 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+public interface HttpResponse {
+
+ int code();
+ byte[] body();
+
+ static HttpResponse of(int code, byte[] body) {
+ return new HttpResponse() {
+ @Override public int code() { return code; }
+ @Override public byte[] body() { return body; }
+ };
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
new file mode 100644
index 00000000000..41b432449df
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -0,0 +1,514 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.OperationType;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonLocation;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
+import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
+import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
+import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
+import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
+import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
+import static java.lang.Math.min;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * @author jonmv
+ * @author bjorncs
+ */
+public class JsonFeeder implements Closeable {
+
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
+ Thread t = new Thread(r, "json-feeder-result-executor");
+ t.setDaemon(true);
+ return t;
+ });
+ private final FeedClient client;
+ private final OperationParameters protoParameters;
+
+ private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
+ this.client = client;
+ this.protoParameters = protoParameters;
+ }
+
+ public interface ResultCallback {
+ /**
+ * Invoked after each operation has either completed successfully or failed
+ *
+ * @param result Non-null if operation completed successfully
+ * @param error Non-null if operation failed
+ */
+ default void onNextResult(Result result, FeedException error) { }
+
+ /**
+ * Invoked if an unrecoverable error occurred during feed processing,
+ * after which no other {@link ResultCallback} methods are invoked.
+ */
+ default void onError(FeedException error) { }
+
+ /**
+ * Invoked when all feed operations are either completed successfully or failed.
+ */
+ default void onComplete() { }
+ }
+
+ public static Builder builder(FeedClient client) { return new Builder(client); }
+
+ /** Feeds single JSON feed operations on the form
+ * <pre>
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * }
+ * </pre>
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Result> feedSingle(String json) {
+ CompletableFuture<Result> result = new CompletableFuture<>();
+ try {
+ SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
+ parser.next().whenCompleteAsync((operationResult, error) -> {
+ if (error != null) {
+ result.completeExceptionally(error);
+ } else {
+ result.complete(operationResult);
+ }
+ }, resultExecutor);
+ } catch (Exception e) {
+ resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
+ }
+ return result;
+ }
+
+ /** Feeds a stream containing a JSON array of feed operations on the form
+ * <pre>
+ * [
+ * {
+ * "id": "id:ns:type::boo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "put": "id:ns:type::foo",
+ * "fields": { ... document fields ... }
+ * },
+ * {
+ * "update": "id:ns:type:n=4:bar",
+ * "create": true,
+ * "fields": { ... partial update fields ... }
+ * },
+ * {
+ * "remove": "id:ns:type:g=foo:bar",
+ * "condition": "type.baz = \"bax\""
+ * },
+ * ...
+ * ]
+ * </pre>
+ * Note that {@code "id"} is an alias for the document put operation.
+ * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
+ return feedMany(jsonStream, 1 << 26, resultCallback);
+ }
+
+ /**
+ * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance.
+ * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details.
+ */
+ public CompletableFuture<Void> feedMany(InputStream jsonStream) {
+ return feedMany(jsonStream, new ResultCallback() { });
+ }
+
+ CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
+ CompletableFuture<Void> overallResult = new CompletableFuture<>();
+ CompletableFuture<Result> result;
+ AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
+ AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
+ try {
+ RingBufferStream buffer = new RingBufferStream(jsonStream, size);
+ while ((result = buffer.next()) != null) {
+ pending.incrementAndGet();
+ result.whenCompleteAsync((r, t) -> {
+ if (!finalCallbackInvoked.get()) {
+ resultCallback.onNextResult(r, (FeedException) t);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ }
+ }, resultExecutor);
+ }
+ if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ resultCallback.onComplete();
+ overallResult.complete(null);
+ });
+ }
+ } catch (Exception e) {
+ if (finalCallbackInvoked.compareAndSet(false, true)) {
+ resultExecutor.execute(() -> {
+ FeedException wrapped = wrapException(e);
+ resultCallback.onError(wrapped);
+ overallResult.completeExceptionally(wrapped);
+ });
+ }
+ }
+ return overallResult;
+ }
+
+ private static final JsonFactory factory = new JsonFactory();
+
+ @Override public void close() throws IOException {
+ client.close();
+ resultExecutor.shutdown();
+ try {
+ if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+ throw new IOException("Failed to close client in time");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private FeedException wrapException(Exception e) {
+ if (e instanceof FeedException) return (FeedException) e;
+ if (e instanceof IOException) {
+ return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e);
+ }
+ return new FeedException(e);
+ }
+
+ private class RingBufferStream extends InputStream {
+
+ private final byte[] b = new byte[1];
+ private final InputStream in;
+ private final Object lock = new Object();
+ private byte[] data;
+ private int size;
+ private IOException thrown = null;
+ private long tail = 0;
+ private long pos = 0;
+ private long head = 0;
+ private boolean done = false;
+ private final OperationParserAndExecutor parserAndExecutor;
+
+ RingBufferStream(InputStream in, int size) throws IOException {
+ this.in = in;
+ this.data = new byte[size];
+ this.size = size;
+
+ new Thread(this::fill, "feed-reader").start();
+
+ this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
+ }
+
+ @Override
+ public int read() throws IOException {
+ return read(b, 0, 1) == -1 ? -1 : b[0];
+ }
+
+ @Override
+ public int read(byte[] buffer, int off, int len) throws IOException {
+ try {
+ int ready;
+ synchronized (lock) {
+ if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write.
+ expand();
+
+ while ((ready = (int) (head - pos)) == 0 && ! done)
+ lock.wait();
+ }
+ if (thrown != null) throw thrown;
+ if (ready == 0) return -1;
+
+ ready = min(ready, len);
+ int offset = (int) (pos % size);
+ int length = min(ready, size - offset);
+ System.arraycopy(data, offset, buffer, off, length);
+ if (length < ready)
+ System.arraycopy(data, 0, buffer, off + length, ready - length);
+
+ pos += ready;
+ return ready;
+ }
+ catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
+ }
+ }
+
+ public CompletableFuture<Result> next() throws IOException {
+ return parserAndExecutor.next();
+ }
+
+ private void expand() {
+ int newSize = size * 2;
+ if (newSize <= size)
+ throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much");
+
+ byte[] newData = new byte[newSize];
+ int offset = (int) (tail % size);
+ int newOffset = (int) (tail % newSize);
+ int toWrite = size - offset;
+ System.arraycopy(data, offset, newData, newOffset, toWrite);
+ if (toWrite < size)
+ System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite);
+ size = newSize;
+ data = newData;
+ lock.notify();
+ }
+
+ private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
+ private byte[] copy(long start, long end) {
+ int length = (int) (end - start);
+ byte[] buffer = new byte[prefix.length + length + 1];
+ System.arraycopy(prefix, 0, buffer, 0, prefix.length);
+
+ int offset = (int) (start % size);
+ int toWrite = min(length, size - offset);
+ System.arraycopy(data, offset, buffer, prefix.length, toWrite);
+ if (toWrite < length)
+ System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
+
+ buffer[buffer.length - 1] = '}';
+ return buffer;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ synchronized (lock) {
+ done = true;
+ lock.notifyAll();
+ }
+ in.close();
+ }
+
+ private void fill() {
+ try {
+ while (true) {
+ int free;
+ synchronized (lock) {
+ while ((free = (int) (tail + size - head)) <= 0 && ! done)
+ lock.wait();
+ }
+ if (done) break;
+
+ int off = (int) (head % size);
+ int len = min(min(free, size - off), 1 << 13);
+ int read = in.read(data, off, len);
+
+ synchronized (lock) {
+ if (read < 0) done = true;
+ else head += read;
+ lock.notify();
+ }
+ }
+ } catch (InterruptedException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage());
+ }
+ } catch (IOException e) {
+ synchronized (lock) {
+ done = true;
+ thrown = e;
+ }
+ }
+ }
+
+ private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ String payload = new String(copy(start, end), UTF_8);
+ synchronized (lock) {
+ tail = end;
+ lock.notify();
+ }
+ return payload;
+ }
+ }
+ }
+
+ private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
+
+ private final byte[] json;
+
+ SingleOperationParserAndExecutor(byte[] json) throws IOException {
+ super(factory.createParser(json), false);
+ this.json = json;
+ }
+
+ @Override
+ String getDocumentJson(long start, long end) {
+ return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}";
+ }
+ }
+
+ private abstract class OperationParserAndExecutor {
+
+ private final JsonParser parser;
+ private final boolean multipleOperations;
+ private boolean arrayPrefixParsed;
+
+ protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) {
+ this.parser = parser;
+ this.multipleOperations = multipleOperations;
+ }
+
+ abstract String getDocumentJson(long start, long end);
+
+ OperationParseException parseException(String error) {
+ JsonLocation location = parser.getTokenLocation();
+ return new OperationParseException(error + " at offset " + location.getByteOffset() +
+ " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")");
+ }
+
+ CompletableFuture<Result> next() throws IOException {
+ JsonToken token = parser.nextToken();
+ if (multipleOperations && ! arrayPrefixParsed && token == JsonToken.START_ARRAY) {
+ arrayPrefixParsed = true;
+ token = parser.nextToken();
+ }
+ if (token == JsonToken.END_ARRAY && multipleOperations) return null;
+ else if (token == null && ! arrayPrefixParsed) return null;
+ else if (token != JsonToken.START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'");
+ long start = 0, end = -1;
+ OperationType type = null;
+ DocumentId id = null;
+ OperationParameters parameters = protoParameters;
+ loop: while (true) {
+ switch (parser.nextToken()) {
+ case FIELD_NAME:
+ switch (parser.getText()) {
+ case "id":
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
+ case "condition": parameters = parameters.testAndSetCondition(readString()); break;
+ case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
+ case "fields": {
+ expect(START_OBJECT);
+ start = parser.getTokenLocation().getByteOffset();
+ int depth = 1;
+ while (depth > 0) switch (parser.nextToken()) {
+ case START_OBJECT: ++depth; break;
+ case END_OBJECT: --depth; break;
+ }
+ end = parser.getTokenLocation().getByteOffset() + 1;
+ break;
+ }
+ default: throw parseException("Unexpected field name '" + parser.getText() + "'");
+ }
+ break;
+
+ case END_OBJECT:
+ break loop;
+
+ default:
+ throw parseException("Unexpected token '" + parser.currentToken() + "'");
+ }
+ }
+ if (id == null)
+ throw parseException("No document id for document");
+ if (type == REMOVE) {
+ if (end >= start)
+ throw parseException("Illegal 'fields' object for remove operation");
+ else
+ start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head.
+ }
+ else if (end < start)
+ throw parseException("No 'fields' object for document");
+
+ String payload = getDocumentJson(start, end);
+ switch (type) {
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
+ default: throw new OperationParseException("Unexpected operation type '" + type + "'");
+ }
+ }
+
+ private void expect(JsonToken token) throws IOException {
+ if (parser.nextToken() != token)
+ throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+ }
+
+ private String readString() throws IOException {
+ String value = parser.nextTextValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + JsonToken.VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+ }
+
+ private boolean readBoolean() throws IOException {
+ Boolean value = parser.nextBooleanValue();
+ if (value == null)
+ throw new OperationParseException("Expected '" + JsonToken.VALUE_FALSE + "' or '" + JsonToken.VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
+ ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
+
+ return value;
+
+ }
+
+ private DocumentId readId() throws IOException {
+ return DocumentId.of(readString());
+ }
+
+ }
+
+ public static class Builder {
+
+ final FeedClient client;
+ OperationParameters parameters = OperationParameters.empty();
+
+ private Builder(FeedClient client) {
+ this.client = requireNonNull(client);
+ }
+
+ public Builder withTimeout(Duration timeout) {
+ parameters = parameters.timeout(timeout);
+ return this;
+ }
+
+ public Builder withRoute(String route) {
+ parameters = parameters.route(route);
+ return this;
+ }
+
+ public Builder withTracelevel(int tracelevel) {
+ parameters = parameters.tracelevel(tracelevel);
+ return this;
+ }
+
+ public JsonFeeder build() {
+ return new JsonFeeder(client, parameters);
+ }
+
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java
new file mode 100644
index 00000000000..0ec40e114df
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java
@@ -0,0 +1,97 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+/**
+ * Per-operation feed parameters
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public class OperationParameters {
+
+ static final OperationParameters empty = new OperationParameters(false, null, null, null, 0);
+
+ private final boolean create;
+ private final String condition;
+ private final Duration timeout;
+ private final String route;
+ private final int tracelevel;
+
+ private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) {
+ this.create = create;
+ this.condition = condition;
+ this.timeout = timeout;
+ this.route = route;
+ this.tracelevel = tracelevel;
+ }
+
+ public static OperationParameters empty() { return empty; }
+
+ public OperationParameters createIfNonExistent(boolean create) {
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters testAndSetCondition(String condition) {
+ if (condition.isEmpty())
+ throw new IllegalArgumentException("TestAndSetCondition must be non-empty");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters timeout(Duration timeout) {
+ if (timeout.isNegative() || timeout.isZero())
+ throw new IllegalArgumentException("Timeout must be positive, but was " + timeout);
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters route(String route) {
+ if (route.isEmpty())
+ throw new IllegalArgumentException("Route must be non-empty");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public OperationParameters tracelevel(int tracelevel) {
+ if (tracelevel < 1 || tracelevel > 9)
+ throw new IllegalArgumentException("Tracelevel must be in [1, 9]");
+
+ return new OperationParameters(create, condition, timeout, route, tracelevel);
+ }
+
+ public boolean createIfNonExistent() { return create; }
+ public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); }
+ public Optional<Duration> timeout() { return Optional.ofNullable(timeout); }
+ public Optional<String> route() { return Optional.ofNullable(route); }
+ public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OperationParameters that = (OperationParameters) o;
+ return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(create, condition, timeout, route, tracelevel);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationParameters{" +
+ "create=" + create +
+ ", condition='" + condition + '\'' +
+ ", timeout=" + timeout +
+ ", route='" + route + '\'' +
+ ", tracelevel=" + tracelevel +
+ '}';
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java
new file mode 100644
index 00000000000..4404462be2e
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedException;
+
+/**
+ * Signals that supplied JSON for a document/operation is invalid
+ *
+ * @author bjorncs
+ */
+public class OperationParseException extends FeedException {
+
+ public OperationParseException(String message) { super(message); }
+
+ public OperationParseException(String message, Throwable cause) { super(message, cause); }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java
new file mode 100644
index 00000000000..ab2faf245d8
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java
@@ -0,0 +1,139 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Statistics for feed operations over HTTP against a Vespa cluster.
+ *
+ * @author jonmv
+ */
+public class OperationStats {
+
+ private final long requests;
+ private final Map<Integer, Long> responsesByCode;
+ private final long inflight;
+ private final long exceptions;
+ private final long averageLatencyMillis;
+ private final long minLatencyMillis;
+ private final long maxLatencyMillis;
+ private final long bytesSent;
+ private final long bytesReceived;
+
+ public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight,
+ long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis,
+ long bytesSent, long bytesReceived) {
+ this.requests = requests;
+ this.responsesByCode = responsesByCode;
+ this.exceptions = exceptions;
+ this.inflight = inflight;
+ this.averageLatencyMillis = averageLatencyMillis;
+ this.minLatencyMillis = minLatencyMillis;
+ this.maxLatencyMillis = maxLatencyMillis;
+ this.bytesSent = bytesSent;
+ this.bytesReceived = bytesReceived;
+ }
+
+ /** Returns the difference between this and the initial. Min and max latency are not modified. */
+ public OperationStats since(OperationStats initial) {
+ return new OperationStats(requests - initial.requests,
+ responsesByCode.entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(),
+ entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))),
+ exceptions - initial.exceptions,
+ inflight - initial.inflight,
+ responsesByCode.size() == initial.responsesByCode.size() ? 0 :
+ (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size())
+ / (responsesByCode.size() - initial.responsesByCode.size()),
+ minLatencyMillis,
+ maxLatencyMillis,
+ bytesSent - initial.bytesSent,
+ bytesReceived - initial.bytesReceived);
+ }
+
+ /** Number of HTTP requests attempted. */
+ public long requests() {
+ return requests;
+ }
+
+ /** Number of HTTP responses received. */
+ public long responses() {
+ return requests - inflight - exceptions;
+ }
+
+ /** Number of 200 OK HTTP responses received. */
+ public long successes() {
+ return responsesByCode.getOrDefault(200, 0L);
+ }
+
+ /** Number of HTTP responses by status code. */
+ public Map<Integer, Long> responsesByCode() {
+ return responsesByCode;
+ }
+
+ /** Number of exceptions (instead of responses). */
+ public long exceptions() {
+ return exceptions;
+ }
+
+ /** Number of attempted requests which haven't yielded a response or exception yet. */
+ public long inflight() {
+ return inflight;
+ }
+
+ /** Average request-response latency, or -1. */
+ public long averageLatencyMillis() {
+ return averageLatencyMillis;
+ }
+
+ /** Minimum request-response latency, or -1. */
+ public long minLatencyMillis() {
+ return minLatencyMillis;
+ }
+
+ /** Maximum request-response latency, or -1. */
+ public long maxLatencyMillis() {
+ return maxLatencyMillis;
+ }
+
+ /** Number of bytes sent, for HTTP requests with a response. */
+ public long bytesSent() {
+ return bytesSent;
+ }
+
+ /** Number of bytes received in HTTP responses. */
+ public long bytesReceived() {
+ return bytesReceived;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ OperationStats that = (OperationStats) o;
+ return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived);
+ }
+
+ @Override
+ public String toString() {
+ return "Stats{" +
+ "requests=" + requests +
+ ", responsesByCode=" + responsesByCode +
+ ", exceptions=" + exceptions +
+ ", inflight=" + inflight +
+ ", averageLatencyMillis=" + averageLatencyMillis +
+ ", minLatencyMillis=" + minLatencyMillis +
+ ", maxLatencyMillis=" + maxLatencyMillis +
+ ", bytesSent=" + bytesSent +
+ ", bytesReceived=" + bytesReceived +
+ '}';
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java
new file mode 100644
index 00000000000..fa114f6a183
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java
@@ -0,0 +1,23 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.Optional;
+
+/**
+ * Result for a document operation which completed normally.
+ *
+ * @author bjorncs
+ * @author jonmv
+ */
+public interface Result {
+
+ enum Type {
+ success,
+ conditionNotMet
+ }
+
+ Type type();
+ DocumentId documentId();
+ Optional<String> resultMessage();
+ Optional<String> traceMessage();
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java
new file mode 100644
index 00000000000..27803898c01
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java
@@ -0,0 +1,29 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.OperationParameters;
+
+import java.util.Optional;
+
+/**
+ * Signals that the document API in the feed container returned a failure result for a feed operation.
+ *
+ * @author jonmv
+ */
+public class ResultException extends FeedException {
+
+ private final String trace;
+
+ public ResultException(DocumentId documentId, String message, String trace) {
+ super(documentId, message);
+ this.trace = trace;
+ }
+
+ /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */
+ public Optional<String> getTrace() {
+ return Optional.ofNullable(trace);
+ }
+
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java
new file mode 100644
index 00000000000..f149b13196b
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java
@@ -0,0 +1,17 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedException;
+
+/**
+ * Signals that the client was unable to obtain a proper response/result from container
+ *
+ * @author bjorncs
+ */
+public class ResultParseException extends FeedException {
+
+ public ResultParseException(DocumentId documentId, String message) { super(documentId, message); }
+
+ public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); }
+}
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java
new file mode 100644
index 00000000000..daab16a9ff2
--- /dev/null
+++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java
@@ -0,0 +1,9 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * @author bjorncs
+ */
+
+@PublicApi
+package ai.vespa.feed.client;
+
+import com.yahoo.api.annotations.PublicApi; \ No newline at end of file