diff options
Diffstat (limited to 'vespa-feed-client-api/src')
17 files changed, 1737 insertions, 0 deletions
diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java new file mode 100644 index 00000000000..5474bcfda01 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/DocumentId.java @@ -0,0 +1,112 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalLong; + +import static java.util.Objects.requireNonNull; + +/** + * Represents a Vespa document id + * + * @author jonmv + */ +public class DocumentId { + + private final String documentType; + private final String namespace; + private final OptionalLong number; + private final Optional<String> group; + private final String userSpecific; + + private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) { + this.documentType = requireNonNull(documentType); + this.namespace = requireNonNull(namespace); + this.number = requireNonNull(number); + this.group = requireNonNull(group); + this.userSpecific = requireNonNull(userSpecific); + } + + public static DocumentId of(String namespace, String documentType, String userSpecific) { + return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific); + } + + public static DocumentId of(String namespace, String documentType, long number, String userSpecific) { + return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific); + } + + public static DocumentId of(String namespace, String documentType, String group, String userSpecific) { + return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific); + } + + public static DocumentId of(String serialized) { + DocumentId parsed = parse(serialized); + if (parsed != null) return parsed; + throw new IllegalArgumentException("Document ID must be on the form " + + "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " + + "but was '" + serialized + "'"); + } + + private static DocumentId parse(String serialized) { + int i, j = -1; + if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; + if ( ! "id".equals(serialized.substring(i, j))) return null; + if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; + String namespace = serialized.substring(i, j); + if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; + String documentType = serialized.substring(i, j); + if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; + String group = serialized.substring(i, j); + if (serialized.length() <= (i = j + 1)) return null; + String userSpecific = serialized.substring(i); + if (group.startsWith("n=") && group.length() > 2) + return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific); + if (group.startsWith("g=") && group.length() > 2) + return DocumentId.of(namespace, documentType, group.substring(2), userSpecific); + if (group.isEmpty()) + return DocumentId.of(namespace, documentType, userSpecific); + return null; + } + + public String documentType() { + return documentType; + } + + public String namespace() { + return namespace; + } + + public OptionalLong number() { + return number; + } + + public Optional<String> group() { + return group; + } + + public String userSpecific() { + return userSpecific; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocumentId that = (DocumentId) o; + return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific); + } + + @Override + public int hashCode() { + return Objects.hash(documentType, namespace, number, group, userSpecific); + } + + @Override + public String toString() { + return "id:" + namespace + ":" + documentType + ":" + + (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) + + ":" + userSpecific; + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java new file mode 100644 index 00000000000..d463c611d6a --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -0,0 +1,110 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; + +/** + * Asynchronous feed client accepting document operations as JSON. The payload should be + * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre> + * { + * "fields": { + * ... + * } + * } + * </pre> + * + * @author bjorncs + * @author jonmv + */ +public interface FeedClient extends Closeable { + + /** + * Send a document put with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); + + /** + * Send a document update with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); + + /** + * Send a document remove with the given parameters, returning a future with the result of the operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); + + /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ + OperationStats stats(); + + /** Current state of the circuit breaker. */ + CircuitBreaker.State circuitBreakerState(); + + /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ + void close(boolean graceful); + + /** Initiates graceful shutdown. See {@link #close(boolean)}. */ + default void close() { close(true); } + + /** Controls what to retry, and how many times. */ + interface RetryStrategy { + + /** Whether to retry operations of the given type. */ + default boolean retry(OperationType type) { return true; } + + /** Number of retries per operation for assumed transient, non-backpressure problems. */ + default int retries() { return 10; } + + } + + /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ + interface CircuitBreaker { + + /** A circuit breaker which is always closed. */ + CircuitBreaker FUSED = () -> State.CLOSED; + + /** Called by the client whenever a successful response is obtained. */ + default void success() { } + + /** Called by the client whenever an error HTTP response is received. */ + default void failure(HttpResponse response) { } + + /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */ + default void failure(Throwable cause) { } + + /** The current state of the circuit breaker. */ + State state(); + + enum State { + + /** Circuit is closed: business as usual. */ + CLOSED, + + /** Circuit is half-open: something is wrong, perhaps it recovers? */ + HALF_OPEN, + + /** Circuit is open: we have given up. */ + OPEN; + + } + + } + + enum OperationType { + + /** A document put operation. This is idempotent. */ + PUT, + + /** A document update operation. This is idempotent if all its contained updates are. */ + UPDATE, + + /** A document remove operation. This is idempotent. */ + REMOVE; + + } + + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java new file mode 100644 index 00000000000..daf3f62dac1 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -0,0 +1,128 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URI; +import java.nio.file.Path; +import java.security.PrivateKey; +import java.security.cert.X509Certificate; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.ServiceLoader; +import java.util.function.Supplier; + +/** + * Builder for creating a {@link FeedClient} instance. + * + * @author bjorncs + * @author jonmv + */ +public interface FeedClientBuilder { + + /** Creates a builder for a single container endpoint **/ + static FeedClientBuilder create(URI endpoint) { return create(Collections.singletonList(endpoint)); } + + /** Creates a builder for multiple container endpoints **/ + static FeedClientBuilder create(List<URI> endpoints) { + Iterator<FeedClientBuilder> iterator = ServiceLoader.load(FeedClientBuilder.class).iterator(); + if (iterator.hasNext()) { + return iterator.next().setEndpointUris(endpoints); + } else { + try { + Class<?> aClass = Class.forName("ai.vespa.feed.client.impl.FeedClientBuilderImpl"); + for (Constructor<?> constructor : aClass.getConstructors()) { + if (constructor.getParameterTypes().length==0) { + return ((FeedClientBuilder)constructor.newInstance()).setEndpointUris(endpoints); + } + } + throw new RuntimeException("Could not find Feed client builder implementation"); + } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Sets the number of connections this client will use per endpoint. + * + * A reasonable value here is a value that lets all feed clients (if more than one) + * collectively have a number of connections which is a small multiple of the numbers + * of containers in the cluster to feed, so load can be balanced across these containers. + * In general, this value should be kept as low as possible, but poor connectivity + * between feeder and cluster may also warrant a higher number of connections. + */ + FeedClientBuilder setConnectionsPerEndpoint(int max); + + /** + * Sets the maximum number of streams per HTTP/2 connection for this client. + * + * This determines the maximum number of concurrent, inflight requests for this client, + * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over + * more connections, when possible. + * The feed client automatically throttles load to achieve the best throughput, and the + * actual number of streams per connection is usually lower than the maximum. + */ + FeedClientBuilder setMaxStreamPerConnection(int max); + + /** Sets {@link SSLContext} instance. */ + FeedClientBuilder setSslContext(SSLContext context); + + /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */ + FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier); + + /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */ + FeedClientBuilder noBenchmarking(); + + /** Adds HTTP request header to all client requests. */ + FeedClientBuilder addRequestHeader(String name, String value); + + /** + * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request, + * i.e. value can be dynamically updated during a feed. + */ + FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier); + + /** + * Overrides default retry strategy. + * @see FeedClient.RetryStrategy + */ + FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy); + + /** + * Overrides default circuit breaker. + * @see FeedClient.CircuitBreaker + */ + FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker); + + /** Sets path to client SSL certificate/key PEM files */ + FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile); + + /** Sets client SSL certificates/key */ + FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey); + + /** Sets client SSL certificate/key */ + FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey); + + FeedClientBuilder setDryrun(boolean enabled); + + /** + * Overrides JVM default SSL truststore + * @param caCertificatesFile Path to PEM encoded file containing trusted certificates + */ + FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile); + + /** Overrides JVM default SSL truststore */ + FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates); + + /** Overrides endpoint URIs for this client */ + FeedClientBuilder setEndpointUris(List<URI> endpoints); + + /** Constructs instance of {@link FeedClient} from builder configuration */ + FeedClient build(); + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java new file mode 100644 index 00000000000..1936eb09418 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/FeedException.java @@ -0,0 +1,47 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Optional; + +/** + * Signals that an error occurred during feeding + * + * @author bjorncs + */ +public class FeedException extends RuntimeException { + + private final DocumentId documentId; + + public FeedException(String message) { + super(message); + this.documentId = null; + } + + public FeedException(DocumentId documentId, String message) { + super(message); + this.documentId = documentId; + } + + public FeedException(String message, Throwable cause) { + super(message, cause); + this.documentId = null; + } + + public FeedException(Throwable cause) { + super(cause); + this.documentId = null; + } + + public FeedException(DocumentId documentId, Throwable cause) { + super(cause); + this.documentId = documentId; + } + + public FeedException(DocumentId documentId, String message, Throwable cause) { + super(message, cause); + this.documentId = documentId; + } + + public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java new file mode 100644 index 00000000000..62850fef32d --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/HttpResponse.java @@ -0,0 +1,16 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +public interface HttpResponse { + + int code(); + byte[] body(); + + static HttpResponse of(int code, byte[] body) { + return new HttpResponse() { + @Override public int code() { return code; } + @Override public byte[] body() { return body; } + }; + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java new file mode 100644 index 00000000000..41b432449df --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/JsonFeeder.java @@ -0,0 +1,514 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.OperationType; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonLocation; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; +import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; +import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; +import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; +import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; +import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; +import static java.lang.Math.min; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; + +/** + * @author jonmv + * @author bjorncs + */ +public class JsonFeeder implements Closeable { + + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { + Thread t = new Thread(r, "json-feeder-result-executor"); + t.setDaemon(true); + return t; + }); + private final FeedClient client; + private final OperationParameters protoParameters; + + private JsonFeeder(FeedClient client, OperationParameters protoParameters) { + this.client = client; + this.protoParameters = protoParameters; + } + + public interface ResultCallback { + /** + * Invoked after each operation has either completed successfully or failed + * + * @param result Non-null if operation completed successfully + * @param error Non-null if operation failed + */ + default void onNextResult(Result result, FeedException error) { } + + /** + * Invoked if an unrecoverable error occurred during feed processing, + * after which no other {@link ResultCallback} methods are invoked. + */ + default void onError(FeedException error) { } + + /** + * Invoked when all feed operations are either completed successfully or failed. + */ + default void onComplete() { } + } + + public static Builder builder(FeedClient client) { return new Builder(client); } + + /** Feeds single JSON feed operations on the form + * <pre> + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * } + * </pre> + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Result> feedSingle(String json) { + CompletableFuture<Result> result = new CompletableFuture<>(); + try { + SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); + parser.next().whenCompleteAsync((operationResult, error) -> { + if (error != null) { + result.completeExceptionally(error); + } else { + result.complete(operationResult); + } + }, resultExecutor); + } catch (Exception e) { + resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); + } + return result; + } + + /** Feeds a stream containing a JSON array of feed operations on the form + * <pre> + * [ + * { + * "id": "id:ns:type::boo", + * "fields": { ... document fields ... } + * }, + * { + * "put": "id:ns:type::foo", + * "fields": { ... document fields ... } + * }, + * { + * "update": "id:ns:type:n=4:bar", + * "create": true, + * "fields": { ... partial update fields ... } + * }, + * { + * "remove": "id:ns:type:g=foo:bar", + * "condition": "type.baz = \"bax\"" + * }, + * ... + * ] + * </pre> + * Note that {@code "id"} is an alias for the document put operation. + * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { + return feedMany(jsonStream, 1 << 26, resultCallback); + } + + /** + * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. + * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. + */ + public CompletableFuture<Void> feedMany(InputStream jsonStream) { + return feedMany(jsonStream, new ResultCallback() { }); + } + + CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { + CompletableFuture<Void> overallResult = new CompletableFuture<>(); + CompletableFuture<Result> result; + AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation + AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); + try { + RingBufferStream buffer = new RingBufferStream(jsonStream, size); + while ((result = buffer.next()) != null) { + pending.incrementAndGet(); + result.whenCompleteAsync((r, t) -> { + if (!finalCallbackInvoked.get()) { + resultCallback.onNextResult(r, (FeedException) t); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultCallback.onComplete(); + overallResult.complete(null); + } + }, resultExecutor); + } + if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + resultCallback.onComplete(); + overallResult.complete(null); + }); + } + } catch (Exception e) { + if (finalCallbackInvoked.compareAndSet(false, true)) { + resultExecutor.execute(() -> { + FeedException wrapped = wrapException(e); + resultCallback.onError(wrapped); + overallResult.completeExceptionally(wrapped); + }); + } + } + return overallResult; + } + + private static final JsonFactory factory = new JsonFactory(); + + @Override public void close() throws IOException { + client.close(); + resultExecutor.shutdown(); + try { + if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + throw new IOException("Failed to close client in time"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private FeedException wrapException(Exception e) { + if (e instanceof FeedException) return (FeedException) e; + if (e instanceof IOException) { + return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); + } + return new FeedException(e); + } + + private class RingBufferStream extends InputStream { + + private final byte[] b = new byte[1]; + private final InputStream in; + private final Object lock = new Object(); + private byte[] data; + private int size; + private IOException thrown = null; + private long tail = 0; + private long pos = 0; + private long head = 0; + private boolean done = false; + private final OperationParserAndExecutor parserAndExecutor; + + RingBufferStream(InputStream in, int size) throws IOException { + this.in = in; + this.data = new byte[size]; + this.size = size; + + new Thread(this::fill, "feed-reader").start(); + + this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); + } + + @Override + public int read() throws IOException { + return read(b, 0, 1) == -1 ? -1 : b[0]; + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + try { + int ready; + synchronized (lock) { + if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. + expand(); + + while ((ready = (int) (head - pos)) == 0 && ! done) + lock.wait(); + } + if (thrown != null) throw thrown; + if (ready == 0) return -1; + + ready = min(ready, len); + int offset = (int) (pos % size); + int length = min(ready, size - offset); + System.arraycopy(data, offset, buffer, off, length); + if (length < ready) + System.arraycopy(data, 0, buffer, off + length, ready - length); + + pos += ready; + return ready; + } + catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); + } + } + + public CompletableFuture<Result> next() throws IOException { + return parserAndExecutor.next(); + } + + private void expand() { + int newSize = size * 2; + if (newSize <= size) + throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); + + byte[] newData = new byte[newSize]; + int offset = (int) (tail % size); + int newOffset = (int) (tail % newSize); + int toWrite = size - offset; + System.arraycopy(data, offset, newData, newOffset, toWrite); + if (toWrite < size) + System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); + size = newSize; + data = newData; + lock.notify(); + } + + private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); + private byte[] copy(long start, long end) { + int length = (int) (end - start); + byte[] buffer = new byte[prefix.length + length + 1]; + System.arraycopy(prefix, 0, buffer, 0, prefix.length); + + int offset = (int) (start % size); + int toWrite = min(length, size - offset); + System.arraycopy(data, offset, buffer, prefix.length, toWrite); + if (toWrite < length) + System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); + + buffer[buffer.length - 1] = '}'; + return buffer; + } + + + @Override + public void close() throws IOException { + synchronized (lock) { + done = true; + lock.notifyAll(); + } + in.close(); + } + + private void fill() { + try { + while (true) { + int free; + synchronized (lock) { + while ((free = (int) (tail + size - head)) <= 0 && ! done) + lock.wait(); + } + if (done) break; + + int off = (int) (head % size); + int len = min(min(free, size - off), 1 << 13); + int read = in.read(data, off, len); + + synchronized (lock) { + if (read < 0) done = true; + else head += read; + lock.notify(); + } + } + } catch (InterruptedException e) { + synchronized (lock) { + done = true; + thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); + } + } catch (IOException e) { + synchronized (lock) { + done = true; + thrown = e; + } + } + } + + private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { + + RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } + + @Override + String getDocumentJson(long start, long end) { + String payload = new String(copy(start, end), UTF_8); + synchronized (lock) { + tail = end; + lock.notify(); + } + return payload; + } + } + } + + private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { + + private final byte[] json; + + SingleOperationParserAndExecutor(byte[] json) throws IOException { + super(factory.createParser(json), false); + this.json = json; + } + + @Override + String getDocumentJson(long start, long end) { + return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}"; + } + } + + private abstract class OperationParserAndExecutor { + + private final JsonParser parser; + private final boolean multipleOperations; + private boolean arrayPrefixParsed; + + protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { + this.parser = parser; + this.multipleOperations = multipleOperations; + } + + abstract String getDocumentJson(long start, long end); + + OperationParseException parseException(String error) { + JsonLocation location = parser.getTokenLocation(); + return new OperationParseException(error + " at offset " + location.getByteOffset() + + " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")"); + } + + CompletableFuture<Result> next() throws IOException { + JsonToken token = parser.nextToken(); + if (multipleOperations && ! arrayPrefixParsed && token == JsonToken.START_ARRAY) { + arrayPrefixParsed = true; + token = parser.nextToken(); + } + if (token == JsonToken.END_ARRAY && multipleOperations) return null; + else if (token == null && ! arrayPrefixParsed) return null; + else if (token != JsonToken.START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'"); + long start = 0, end = -1; + OperationType type = null; + DocumentId id = null; + OperationParameters parameters = protoParameters; + loop: while (true) { + switch (parser.nextToken()) { + case FIELD_NAME: + switch (parser.getText()) { + case "id": + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; + case "condition": parameters = parameters.testAndSetCondition(readString()); break; + case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; + case "fields": { + expect(START_OBJECT); + start = parser.getTokenLocation().getByteOffset(); + int depth = 1; + while (depth > 0) switch (parser.nextToken()) { + case START_OBJECT: ++depth; break; + case END_OBJECT: --depth; break; + } + end = parser.getTokenLocation().getByteOffset() + 1; + break; + } + default: throw parseException("Unexpected field name '" + parser.getText() + "'"); + } + break; + + case END_OBJECT: + break loop; + + default: + throw parseException("Unexpected token '" + parser.currentToken() + "'"); + } + } + if (id == null) + throw parseException("No document id for document"); + if (type == REMOVE) { + if (end >= start) + throw parseException("Illegal 'fields' object for remove operation"); + else + start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head. + } + else if (end < start) + throw parseException("No 'fields' object for document"); + + String payload = getDocumentJson(start, end); + switch (type) { + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); + default: throw new OperationParseException("Unexpected operation type '" + type + "'"); + } + } + + private void expect(JsonToken token) throws IOException { + if (parser.nextToken() != token) + throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + } + + private String readString() throws IOException { + String value = parser.nextTextValue(); + if (value == null) + throw new OperationParseException("Expected '" + JsonToken.VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + } + + private boolean readBoolean() throws IOException { + Boolean value = parser.nextBooleanValue(); + if (value == null) + throw new OperationParseException("Expected '" + JsonToken.VALUE_FALSE + "' or '" + JsonToken.VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + + ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); + + return value; + + } + + private DocumentId readId() throws IOException { + return DocumentId.of(readString()); + } + + } + + public static class Builder { + + final FeedClient client; + OperationParameters parameters = OperationParameters.empty(); + + private Builder(FeedClient client) { + this.client = requireNonNull(client); + } + + public Builder withTimeout(Duration timeout) { + parameters = parameters.timeout(timeout); + return this; + } + + public Builder withRoute(String route) { + parameters = parameters.route(route); + return this; + } + + public Builder withTracelevel(int tracelevel) { + parameters = parameters.tracelevel(tracelevel); + return this; + } + + public JsonFeeder build() { + return new JsonFeeder(client, parameters); + } + + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java new file mode 100644 index 00000000000..0ec40e114df --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParameters.java @@ -0,0 +1,97 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.time.Duration; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; + +/** + * Per-operation feed parameters + * + * @author bjorncs + * @author jonmv + */ +public class OperationParameters { + + static final OperationParameters empty = new OperationParameters(false, null, null, null, 0); + + private final boolean create; + private final String condition; + private final Duration timeout; + private final String route; + private final int tracelevel; + + private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) { + this.create = create; + this.condition = condition; + this.timeout = timeout; + this.route = route; + this.tracelevel = tracelevel; + } + + public static OperationParameters empty() { return empty; } + + public OperationParameters createIfNonExistent(boolean create) { + return new OperationParameters(create, condition, timeout, route, tracelevel); + } + + public OperationParameters testAndSetCondition(String condition) { + if (condition.isEmpty()) + throw new IllegalArgumentException("TestAndSetCondition must be non-empty"); + + return new OperationParameters(create, condition, timeout, route, tracelevel); + } + + public OperationParameters timeout(Duration timeout) { + if (timeout.isNegative() || timeout.isZero()) + throw new IllegalArgumentException("Timeout must be positive, but was " + timeout); + + return new OperationParameters(create, condition, timeout, route, tracelevel); + } + + public OperationParameters route(String route) { + if (route.isEmpty()) + throw new IllegalArgumentException("Route must be non-empty"); + + return new OperationParameters(create, condition, timeout, route, tracelevel); + } + + public OperationParameters tracelevel(int tracelevel) { + if (tracelevel < 1 || tracelevel > 9) + throw new IllegalArgumentException("Tracelevel must be in [1, 9]"); + + return new OperationParameters(create, condition, timeout, route, tracelevel); + } + + public boolean createIfNonExistent() { return create; } + public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); } + public Optional<Duration> timeout() { return Optional.ofNullable(timeout); } + public Optional<String> route() { return Optional.ofNullable(route); } + public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationParameters that = (OperationParameters) o; + return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route); + } + + @Override + public int hashCode() { + return Objects.hash(create, condition, timeout, route, tracelevel); + } + + @Override + public String toString() { + return "OperationParameters{" + + "create=" + create + + ", condition='" + condition + '\'' + + ", timeout=" + timeout + + ", route='" + route + '\'' + + ", tracelevel=" + tracelevel + + '}'; + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java new file mode 100644 index 00000000000..4404462be2e --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationParseException.java @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedException; + +/** + * Signals that supplied JSON for a document/operation is invalid + * + * @author bjorncs + */ +public class OperationParseException extends FeedException { + + public OperationParseException(String message) { super(message); } + + public OperationParseException(String message, Throwable cause) { super(message, cause); } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java new file mode 100644 index 00000000000..ab2faf245d8 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java @@ -0,0 +1,139 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Statistics for feed operations over HTTP against a Vespa cluster. + * + * @author jonmv + */ +public class OperationStats { + + private final long requests; + private final Map<Integer, Long> responsesByCode; + private final long inflight; + private final long exceptions; + private final long averageLatencyMillis; + private final long minLatencyMillis; + private final long maxLatencyMillis; + private final long bytesSent; + private final long bytesReceived; + + public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, + long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, + long bytesSent, long bytesReceived) { + this.requests = requests; + this.responsesByCode = responsesByCode; + this.exceptions = exceptions; + this.inflight = inflight; + this.averageLatencyMillis = averageLatencyMillis; + this.minLatencyMillis = minLatencyMillis; + this.maxLatencyMillis = maxLatencyMillis; + this.bytesSent = bytesSent; + this.bytesReceived = bytesReceived; + } + + /** Returns the difference between this and the initial. Min and max latency are not modified. */ + public OperationStats since(OperationStats initial) { + return new OperationStats(requests - initial.requests, + responsesByCode.entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey(), + entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))), + exceptions - initial.exceptions, + inflight - initial.inflight, + responsesByCode.size() == initial.responsesByCode.size() ? 0 : + (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size()) + / (responsesByCode.size() - initial.responsesByCode.size()), + minLatencyMillis, + maxLatencyMillis, + bytesSent - initial.bytesSent, + bytesReceived - initial.bytesReceived); + } + + /** Number of HTTP requests attempted. */ + public long requests() { + return requests; + } + + /** Number of HTTP responses received. */ + public long responses() { + return requests - inflight - exceptions; + } + + /** Number of 200 OK HTTP responses received. */ + public long successes() { + return responsesByCode.getOrDefault(200, 0L); + } + + /** Number of HTTP responses by status code. */ + public Map<Integer, Long> responsesByCode() { + return responsesByCode; + } + + /** Number of exceptions (instead of responses). */ + public long exceptions() { + return exceptions; + } + + /** Number of attempted requests which haven't yielded a response or exception yet. */ + public long inflight() { + return inflight; + } + + /** Average request-response latency, or -1. */ + public long averageLatencyMillis() { + return averageLatencyMillis; + } + + /** Minimum request-response latency, or -1. */ + public long minLatencyMillis() { + return minLatencyMillis; + } + + /** Maximum request-response latency, or -1. */ + public long maxLatencyMillis() { + return maxLatencyMillis; + } + + /** Number of bytes sent, for HTTP requests with a response. */ + public long bytesSent() { + return bytesSent; + } + + /** Number of bytes received in HTTP responses. */ + public long bytesReceived() { + return bytesReceived; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + OperationStats that = (OperationStats) o; + return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode); + } + + @Override + public int hashCode() { + return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived); + } + + @Override + public String toString() { + return "Stats{" + + "requests=" + requests + + ", responsesByCode=" + responsesByCode + + ", exceptions=" + exceptions + + ", inflight=" + inflight + + ", averageLatencyMillis=" + averageLatencyMillis + + ", minLatencyMillis=" + minLatencyMillis + + ", maxLatencyMillis=" + maxLatencyMillis + + ", bytesSent=" + bytesSent + + ", bytesReceived=" + bytesReceived + + '}'; + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java new file mode 100644 index 00000000000..fa114f6a183 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/Result.java @@ -0,0 +1,23 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.Optional; + +/** + * Result for a document operation which completed normally. + * + * @author bjorncs + * @author jonmv + */ +public interface Result { + + enum Type { + success, + conditionNotMet + } + + Type type(); + DocumentId documentId(); + Optional<String> resultMessage(); + Optional<String> traceMessage(); +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java new file mode 100644 index 00000000000..27803898c01 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultException.java @@ -0,0 +1,29 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.OperationParameters; + +import java.util.Optional; + +/** + * Signals that the document API in the feed container returned a failure result for a feed operation. + * + * @author jonmv + */ +public class ResultException extends FeedException { + + private final String trace; + + public ResultException(DocumentId documentId, String message, String trace) { + super(documentId, message); + this.trace = trace; + } + + /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */ + public Optional<String> getTrace() { + return Optional.ofNullable(trace); + } + +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java new file mode 100644 index 00000000000..f149b13196b --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/ResultParseException.java @@ -0,0 +1,17 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedException; + +/** + * Signals that the client was unable to obtain a proper response/result from container + * + * @author bjorncs + */ +public class ResultParseException extends FeedException { + + public ResultParseException(DocumentId documentId, String message) { super(documentId, message); } + + public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); } +} diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java new file mode 100644 index 00000000000..daab16a9ff2 --- /dev/null +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/package-info.java @@ -0,0 +1,9 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * @author bjorncs + */ + +@PublicApi +package ai.vespa.feed.client; + +import com.yahoo.api.annotations.PublicApi;
\ No newline at end of file diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java new file mode 100644 index 00000000000..d795678db39 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/JsonFeederTest.java @@ -0,0 +1,240 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.joining; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class JsonFeederTest { + + @Test + void test() throws IOException { + int docs = 1 << 14; + String json = "[\n" + + + IntStream.range(0, docs).mapToObj(i -> + " {\n" + + " \"id\": \"id:ns:type::abc" + i + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " },\n" + ).collect(joining()) + + + " {\n" + + " \"id\": \"id:ns:type::abc" + docs + "\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n" + + "]"; + AtomicReference<FeedException> exceptionThrow = new AtomicReference<>(); + Path tmpFile = Files.createTempFile(null, null); + Files.write(tmpFile, json.getBytes(UTF_8)); + try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { + AtomicInteger resultsReceived = new AtomicInteger(); + AtomicBoolean completedSuccessfully = new AtomicBoolean(); + long startNanos = System.nanoTime(); + MockClient feedClient = new MockClient(); + JsonFeeder.builder(feedClient).build() + .feedMany(in, 1 << 10, + new JsonFeeder.ResultCallback() { + @Override + public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); } + + @Override + public void onError(FeedException error) { exceptionThrow.set(error); } + + @Override + public void onComplete() { completedSuccessfully.set(true); } + }) + .join(); + + System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + assertEquals(docs + 1, feedClient.putOperations.size()); + assertEquals(docs + 1, resultsReceived.get()); + assertTrue(completedSuccessfully.get()); + assertNull(exceptionThrow.get()); + } + } + + @Test + public void multipleJsonArrayOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "[{" + + " \"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + "},\n" + + "{" + + " \"put\": \"id:ns:type::abc2\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + "}]\n"; + feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8))).get(); + client.assertPutDocumentIds("abc1", "abc2"); + client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + client.assertPutOperation("abc2", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + } + } + + @Test + public void multipleJsonLOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "{\n" + + " \"remove\": \"id:ns:type::abc1\"\n" + + "}\n" + + "{\n" + + " \"fields\": {\n" + + " \"lul\": { \"assign\": \"lal\" }\n" + + " },\n" + + " \"update\": \"id:ns:type::abc2\"\n" + + "}\n" + + "{\n" + + " \"put\": \"id:ns:type::abc3\",\n" + + " \"fields\": {\n" + + " \"lul\": \"lal\"\n" + + " }\n" + + "}\n"; + + feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)), + 3, // Mini-buffer, which needs to expand. + new JsonFeeder.ResultCallback() { }) + .get(); + client.assertRemoveDocumentIds("abc1"); + client.assertUpdateDocumentIds("abc2"); + client.assertUpdateOperation("abc2", "{\"fields\":{\n \"lul\": { \"assign\": \"lal\" }\n }}"); + client.assertPutDocumentIds("abc3"); + client.assertPutOperation("abc3", "{\"fields\":{\n \"lul\": \"lal\"\n }}"); + } + } + + @Test + public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { + MockClient client = new MockClient(); + try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { + String json = "{\"put\": \"id:ns:type::abc1\",\n" + + " \"fields\": {\n" + + " \"lul\":\"lal\"\n" + + " }\n" + + " }\n"; + Result result = feeder.feedSingle(json).get(); + Assertions.assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); + assertEquals(Result.Type.success, result.type()); + assertEquals("success", result.resultMessage().get()); + client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); + } + } + + private static class MockClient implements FeedClient { + final Map<DocumentId, String> putOperations = new LinkedHashMap<>(); + final Map<DocumentId, String> updateOperations = new LinkedHashMap<>(); + final Map<DocumentId, String> removeOperations = new LinkedHashMap<>(); + + @Override + public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { + putOperations.put(documentId, documentJson); + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { + updateOperations.put(documentId, updateJson); + return createSuccessResult(documentId); + } + + @Override + public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { + removeOperations.put(documentId, null); + return createSuccessResult(documentId); + } + + @Override + public OperationStats stats() { return null; } + + @Override + public CircuitBreaker.State circuitBreakerState() { return null; } + + @Override + public void close(boolean graceful) { } + + private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { + return CompletableFuture.completedFuture(new Result(){ + @Override public Type type() { return Type.success; } + @Override public DocumentId documentId() { return documentId; } + @Override public Optional<String> resultMessage() { return Optional.of("success"); } + @Override public Optional<String> traceMessage() { return Optional.empty(); } + }); + } + + void assertDocumentIds(Collection<DocumentId> keys, String... expectedUserSpecificIds) { + List<String> expected = Arrays.stream(expectedUserSpecificIds) + .map(userSpecific -> "id:ns:type::" + userSpecific) + .sorted() + .collect(Collectors.toList()); + List<String> actual = keys.stream() + .map(DocumentId::toString).sorted() + .collect(Collectors.toList()); + assertEquals(expected, actual, "Document ids must match"); + } + + void assertPutDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(putOperations.keySet(), expectedUserSpecificIds); + } + + void assertUpdateDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(updateOperations.keySet(), expectedUserSpecificIds); + } + + void assertRemoveDocumentIds(String... expectedUserSpecificIds) { + assertDocumentIds(removeOperations.keySet(), expectedUserSpecificIds); + } + + void assertPutOperation(String userSpecificId, String expectedJson) { + DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); + String json = putOperations.get(docId); + assertNotNull(json); + assertEquals(expectedJson.trim(), json.trim()); + } + + void assertUpdateOperation(String userSpecificId, String expectedJson) { + DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); + String json = updateOperations.get(docId); + assertNotNull(json); + assertEquals(expectedJson.trim(), json.trim()); + } + + } + +} diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java new file mode 100644 index 00000000000..b951fb62fb5 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java @@ -0,0 +1,90 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.JsonFeeder; +import ai.vespa.feed.client.Result; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +/** + * Sample feeder demonstrating how to programmatically feed to a Vespa cluster. + */ +class JsonFileFeederExample implements Closeable { + + private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName()); + + private final JsonFeeder jsonFeeder; + private final URI endpoint; + + static class ResultCallBack implements JsonFeeder.ResultCallback { + + final AtomicInteger resultsReceived = new AtomicInteger(0); + final AtomicInteger errorsReceived = new AtomicInteger(0); + final long startTimeMillis = System.currentTimeMillis();; + + @Override + public void onNextResult(Result result, FeedException error) { + resultsReceived.incrementAndGet(); + if (error != null) { + log.warning("Problems with feeding document " + + error.documentId().map(DocumentId::toString).orElse("<unknown>") + + ": " + error); + errorsReceived.incrementAndGet(); + } + } + + @Override + public void onError(FeedException error) { + log.severe("Feeding failed fatally: " + error.getMessage()); + } + + @Override + public void onComplete() { + log.info("Feeding completed"); + } + + void dumpStatsToLog() { + log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors."); + log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms."); + } + + } + + JsonFileFeederExample(URI endpoint) { + this.endpoint = endpoint; + FeedClient feedClient = FeedClientBuilder.create(endpoint) + .build(); + this.jsonFeeder = JsonFeeder.builder(feedClient) + .withTimeout(Duration.ofSeconds(30)) + .build(); + } + + /** + * Feed all operations from a stream. + * + * @param stream The input stream to read operations from (JSON array containing one or more document operations). + */ + void batchFeed(InputStream stream, String batchId) { + ResultCallBack callback = new ResultCallBack(); + log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'"); + CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback); + promise.join(); // wait for feeding to complete + callback.dumpStatsToLog(); + } + + @Override + public void close() throws IOException { + jsonFeeder.close(); + } +} diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java new file mode 100644 index 00000000000..3d4ce150fcf --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java @@ -0,0 +1,115 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Simple Streaming feeder implementation which will send operations to a Vespa endpoint. + * Other threads communicate with the feeder by adding new operations on the BlockingQueue + */ + +class JsonStreamFeederExample extends Thread implements AutoCloseable { + + static class Operation { + final String type; + final String documentId; + final String documentFieldsJson; + + Operation(String type, String id, String fields) { + this.type = type; + this.documentId = id; + this.documentFieldsJson = fields; + } + } + + private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName()); + + private final BlockingQueue<Operation> operations; + private final FeedClient feedClient; + private final AtomicBoolean drain = new AtomicBoolean(false); + private final CountDownLatch finishedDraining = new CountDownLatch(1); + private final AtomicInteger resultCounter = new AtomicInteger(); + + /** + * Constructor + * @param operations The shared blocking queue where other threads can put document operations to. + * @param endpoint The endpoint to feed to + */ + JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) { + this.operations = operations; + this.feedClient = FeedClientBuilder.create(endpoint).build(); + } + + /** + * Shutdown this feeder, waits until operations on queue is drained + */ + @Override + public void close() { + log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size()); + drain.set(true); + try { + finishedDraining.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void run() { + while (!drain.get() || !operations.isEmpty()) { + try { + JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS); + if(op == null) // no operations available + continue; + log.info("Put document " + op.documentId); + CompletableFuture<Result> promise; + DocumentId docId = DocumentId.of(op.documentId); + OperationParameters params = OperationParameters.empty(); + String json = op.documentFieldsJson; + switch (op.type) { + case "put": + promise = feedClient.put(docId, json, params); + break; + case "remove": + promise = feedClient.remove(docId, params); + break; + case "update": + promise = feedClient.update(docId, json, params); + break; + default: + throw new IllegalArgumentException("Invalid operation: " + op.type); + } + promise.whenComplete((result, throwable) -> { + if (resultCounter.getAndIncrement() % 10 == 0) { + System.err.println(feedClient.stats()); + } + if (throwable != null) { + System.err.printf("Failure for '%s': %s", docId, throwable); + throwable.printStackTrace(); + } + }); + } catch (InterruptedException e) { + log.log(Level.SEVERE, "Got interrupt exception.", e); + break; + } + } + log.info("Shutting down feeding thread"); + this.feedClient.close(); + finishedDraining.countDown(); + } + +}
\ No newline at end of file diff --git a/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java new file mode 100644 index 00000000000..4e6473a6568 --- /dev/null +++ b/vespa-feed-client-api/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java @@ -0,0 +1,34 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client.examples; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.Result; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +class SimpleExample { + + public static void main(String[] args) { + try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) { + DocumentId id = DocumentId.of("namespace", "documenttype", "1"); + String json = "{\"fields\": {\"title\": \"hello world\"}}"; + OperationParameters params = OperationParameters.empty() + .timeout(Duration.ofSeconds(5)) + .route("myvesparoute"); + CompletableFuture<Result> promise = client.put(id, json, params); + promise.whenComplete(((result, throwable) -> { + if (throwable != null) { + throwable.printStackTrace(); + } else { + System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage()); + } + })); + } + } + +} |