diff options
Diffstat (limited to 'vespa-feed-client')
38 files changed, 162 insertions, 1669 deletions
diff --git a/vespa-feed-client/pom.xml b/vespa-feed-client/pom.xml index 68c9e4b4b7c..8ccd7bb9389 100644 --- a/vespa-feed-client/pom.xml +++ b/vespa-feed-client/pom.xml @@ -34,6 +34,11 @@ <artifactId>jackson-core</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>vespa-feed-client-api</artifactId> + <version>${project.version}</version> + </dependency> <!-- test scope --> <dependency> @@ -72,7 +77,7 @@ <executable>src/main/sh/vespa-version-generator.sh</executable> <arguments> <argument>${project.basedir}/../dist/vtag.map</argument> - <argument>${project.build.directory}/generated-sources/vespa-version/ai/vespa/feed/client/Vespa.java</argument> + <argument>${project.build.directory}/generated-sources/vespa-version/ai/vespa/feed/client/impl/Vespa.java</argument> </arguments> <sourceRoot>${project.build.directory}/generated-sources/vespa-version</sourceRoot> </configuration> diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java deleted file mode 100644 index 5474bcfda01..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Objects; -import java.util.Optional; -import java.util.OptionalLong; - -import static java.util.Objects.requireNonNull; - -/** - * Represents a Vespa document id - * - * @author jonmv - */ -public class DocumentId { - - private final String documentType; - private final String namespace; - private final OptionalLong number; - private final Optional<String> group; - private final String userSpecific; - - private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) { - this.documentType = requireNonNull(documentType); - this.namespace = requireNonNull(namespace); - this.number = requireNonNull(number); - this.group = requireNonNull(group); - this.userSpecific = requireNonNull(userSpecific); - } - - public static DocumentId of(String namespace, String documentType, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific); - } - - public static DocumentId of(String namespace, String documentType, long number, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific); - } - - public static DocumentId of(String namespace, String documentType, String group, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific); - } - - public static DocumentId of(String serialized) { - DocumentId parsed = parse(serialized); - if (parsed != null) return parsed; - throw new IllegalArgumentException("Document ID must be on the form " + - "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " + - "but was '" + serialized + "'"); - } - - private static DocumentId parse(String serialized) { - int i, j = -1; - if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; - if ( ! "id".equals(serialized.substring(i, j))) return null; - if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; - String namespace = serialized.substring(i, j); - if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; - String documentType = serialized.substring(i, j); - if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; - String group = serialized.substring(i, j); - if (serialized.length() <= (i = j + 1)) return null; - String userSpecific = serialized.substring(i); - if (group.startsWith("n=") && group.length() > 2) - return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific); - if (group.startsWith("g=") && group.length() > 2) - return DocumentId.of(namespace, documentType, group.substring(2), userSpecific); - if (group.isEmpty()) - return DocumentId.of(namespace, documentType, userSpecific); - return null; - } - - public String documentType() { - return documentType; - } - - public String namespace() { - return namespace; - } - - public OptionalLong number() { - return number; - } - - public Optional<String> group() { - return group; - } - - public String userSpecific() { - return userSpecific; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - DocumentId that = (DocumentId) o; - return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific); - } - - @Override - public int hashCode() { - return Objects.hash(documentType, namespace, number, group, userSpecific); - } - - @Override - public String toString() { - return "id:" + namespace + ":" + documentType + ":" + - (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) + - ":" + userSpecific; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java deleted file mode 100644 index d463c611d6a..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; - -/** - * Asynchronous feed client accepting document operations as JSON. The payload should be - * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre> - * { - * "fields": { - * ... - * } - * } - * </pre> - * - * @author bjorncs - * @author jonmv - */ -public interface FeedClient extends Closeable { - - /** - * Send a document put with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); - - /** - * Send a document update with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); - - /** - * Send a document remove with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); - - /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ - OperationStats stats(); - - /** Current state of the circuit breaker. */ - CircuitBreaker.State circuitBreakerState(); - - /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ - void close(boolean graceful); - - /** Initiates graceful shutdown. See {@link #close(boolean)}. */ - default void close() { close(true); } - - /** Controls what to retry, and how many times. */ - interface RetryStrategy { - - /** Whether to retry operations of the given type. */ - default boolean retry(OperationType type) { return true; } - - /** Number of retries per operation for assumed transient, non-backpressure problems. */ - default int retries() { return 10; } - - } - - /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ - interface CircuitBreaker { - - /** A circuit breaker which is always closed. */ - CircuitBreaker FUSED = () -> State.CLOSED; - - /** Called by the client whenever a successful response is obtained. */ - default void success() { } - - /** Called by the client whenever an error HTTP response is received. */ - default void failure(HttpResponse response) { } - - /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */ - default void failure(Throwable cause) { } - - /** The current state of the circuit breaker. */ - State state(); - - enum State { - - /** Circuit is closed: business as usual. */ - CLOSED, - - /** Circuit is half-open: something is wrong, perhaps it recovers? */ - HALF_OPEN, - - /** Circuit is open: we have given up. */ - OPEN; - - } - - } - - enum OperationType { - - /** A document put operation. This is idempotent. */ - PUT, - - /** A document update operation. This is idempotent if all its contained updates are. */ - UPDATE, - - /** A document remove operation. This is idempotent. */ - REMOVE; - - } - - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java deleted file mode 100644 index 1936eb09418..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Optional; - -/** - * Signals that an error occurred during feeding - * - * @author bjorncs - */ -public class FeedException extends RuntimeException { - - private final DocumentId documentId; - - public FeedException(String message) { - super(message); - this.documentId = null; - } - - public FeedException(DocumentId documentId, String message) { - super(message); - this.documentId = documentId; - } - - public FeedException(String message, Throwable cause) { - super(message, cause); - this.documentId = null; - } - - public FeedException(Throwable cause) { - super(cause); - this.documentId = null; - } - - public FeedException(DocumentId documentId, Throwable cause) { - super(cause); - this.documentId = documentId; - } - - public FeedException(DocumentId documentId, String message, Throwable cause) { - super(message, cause); - this.documentId = documentId; - } - - public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java deleted file mode 100644 index 07fdb2d7257..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -interface HttpResponse { - - int code(); - byte[] body(); - - static HttpResponse of(int code, byte[] body) { - return new HttpResponse() { - @Override public int code() { return code; } - @Override public byte[] body() { return body; } - }; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java deleted file mode 100644 index 2d7caea9f26..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ /dev/null @@ -1,514 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonLocation; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static ai.vespa.feed.client.FeedClient.OperationType.PUT; -import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; -import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - * @author bjorncs - */ -public class JsonFeeder implements Closeable { - - private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "json-feeder-result-executor"); - t.setDaemon(true); - return t; - }); - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public interface ResultCallback { - /** - * Invoked after each operation has either completed successfully or failed - * - * @param result Non-null if operation completed successfully - * @param error Non-null if operation failed - */ - default void onNextResult(Result result, FeedException error) { } - - /** - * Invoked if an unrecoverable error occurred during feed processing, - * after which no other {@link ResultCallback} methods are invoked. - */ - default void onError(FeedException error) { } - - /** - * Invoked when all feed operations are either completed successfully or failed. - */ - default void onComplete() { } - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds single JSON feed operations on the form - * <pre> - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * } - * </pre> - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Result> feedSingle(String json) { - CompletableFuture<Result> result = new CompletableFuture<>(); - try { - SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); - parser.next().whenCompleteAsync((operationResult, error) -> { - if (error != null) { - result.completeExceptionally(error); - } else { - result.complete(operationResult); - } - }, resultExecutor); - } catch (Exception e) { - resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); - } - return result; - } - - /** Feeds a stream containing a JSON array of feed operations on the form - * <pre> - * [ - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * }, - * { - * "put": "id:ns:type::foo", - * "fields": { ... document fields ... } - * }, - * { - * "update": "id:ns:type:n=4:bar", - * "create": true, - * "fields": { ... partial update fields ... } - * }, - * { - * "remove": "id:ns:type:g=foo:bar", - * "condition": "type.baz = \"bax\"" - * }, - * ... - * ] - * </pre> - * Note that {@code "id"} is an alias for the document put operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { - return feedMany(jsonStream, 1 << 26, resultCallback); - } - - /** - * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. - * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream) { - return feedMany(jsonStream, new ResultCallback() { }); - } - - CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { - CompletableFuture<Void> overallResult = new CompletableFuture<>(); - CompletableFuture<Result> result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation - AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - while ((result = buffer.next()) != null) { - pending.incrementAndGet(); - result.whenCompleteAsync((r, t) -> { - if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, (FeedException) t); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); - overallResult.complete(null); - } - }, resultExecutor); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); - } - } catch (Exception e) { - if (finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - FeedException wrapped = wrapException(e); - resultCallback.onError(wrapped); - overallResult.completeExceptionally(wrapped); - }); - } - } - return overallResult; - } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { - client.close(); - resultExecutor.shutdown(); - try { - if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IOException("Failed to close client in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private FeedException wrapException(Exception e) { - if (e instanceof FeedException) return (FeedException) e; - if (e instanceof IOException) { - return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); - } - return new FeedException(e); - } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final Object lock = new Object(); - private byte[] data; - private int size; - private IOException thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - private final OperationParserAndExecutor parserAndExecutor; - - RingBufferStream(InputStream in, int size) throws IOException { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. - expand(); - - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw thrown; - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - public CompletableFuture<Result> next() throws IOException { - return parserAndExecutor.next(); - } - - private void expand() { - int newSize = size * 2; - if (newSize <= size) - throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); - - byte[] newData = new byte[newSize]; - int offset = (int) (tail % size); - int newOffset = (int) (tail % newSize); - int toWrite = size - offset; - System.arraycopy(data, offset, newData, newOffset, toWrite); - if (toWrite < size) - System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); - size = newSize; - data = newData; - lock.notify(); - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } catch (InterruptedException e) { - synchronized (lock) { - done = true; - thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); - } - } catch (IOException e) { - synchronized (lock) { - done = true; - thrown = e; - } - } - } - - private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - - RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } - - @Override - String getDocumentJson(long start, long end) { - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - return payload; - } - } - } - - private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { - - private final byte[] json; - - SingleOperationParserAndExecutor(byte[] json) throws IOException { - super(factory.createParser(json), false); - this.json = json; - } - - @Override - String getDocumentJson(long start, long end) { - return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}"; - } - } - - private abstract class OperationParserAndExecutor { - - private final JsonParser parser; - private final boolean multipleOperations; - private boolean arrayPrefixParsed; - - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { - this.parser = parser; - this.multipleOperations = multipleOperations; - } - - abstract String getDocumentJson(long start, long end); - - OperationParseException parseException(String error) { - JsonLocation location = parser.getTokenLocation(); - return new OperationParseException(error + " at offset " + location.getByteOffset() + - " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")"); - } - - CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (multipleOperations && ! arrayPrefixParsed && token == START_ARRAY) { - arrayPrefixParsed = true; - token = parser.nextToken(); - } - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && ! arrayPrefixParsed) return null; - else if (token != START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'"); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw parseException("Unexpected field name '" + parser.getText() + "'"); - } - break; - - case END_OBJECT: - break loop; - - default: - throw parseException("Unexpected token '" + parser.currentToken() + "'"); - } - } - if (id == null) - throw parseException("No document id for document"); - if (type == REMOVE) { - if (end >= start) - throw parseException("Illegal 'fields' object for remove operation"); - else - start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head. - } - else if (end < start) - throw parseException("No 'fields' object for document"); - - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new OperationParseException("Unexpected operation type '" + type + "'"); - } - } - - private void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - } - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonFeeder build() { - return new JsonFeeder(client, parameters); - } - - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java deleted file mode 100644 index 0ec40e114df..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.time.Duration; -import java.util.Objects; -import java.util.Optional; -import java.util.OptionalInt; - -/** - * Per-operation feed parameters - * - * @author bjorncs - * @author jonmv - */ -public class OperationParameters { - - static final OperationParameters empty = new OperationParameters(false, null, null, null, 0); - - private final boolean create; - private final String condition; - private final Duration timeout; - private final String route; - private final int tracelevel; - - private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) { - this.create = create; - this.condition = condition; - this.timeout = timeout; - this.route = route; - this.tracelevel = tracelevel; - } - - public static OperationParameters empty() { return empty; } - - public OperationParameters createIfNonExistent(boolean create) { - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters testAndSetCondition(String condition) { - if (condition.isEmpty()) - throw new IllegalArgumentException("TestAndSetCondition must be non-empty"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters timeout(Duration timeout) { - if (timeout.isNegative() || timeout.isZero()) - throw new IllegalArgumentException("Timeout must be positive, but was " + timeout); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters route(String route) { - if (route.isEmpty()) - throw new IllegalArgumentException("Route must be non-empty"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters tracelevel(int tracelevel) { - if (tracelevel < 1 || tracelevel > 9) - throw new IllegalArgumentException("Tracelevel must be in [1, 9]"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public boolean createIfNonExistent() { return create; } - public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); } - public Optional<Duration> timeout() { return Optional.ofNullable(timeout); } - public Optional<String> route() { return Optional.ofNullable(route); } - public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OperationParameters that = (OperationParameters) o; - return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route); - } - - @Override - public int hashCode() { - return Objects.hash(create, condition, timeout, route, tracelevel); - } - - @Override - public String toString() { - return "OperationParameters{" + - "create=" + create + - ", condition='" + condition + '\'' + - ", timeout=" + timeout + - ", route='" + route + '\'' + - ", tracelevel=" + tracelevel + - '}'; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java deleted file mode 100644 index f60368dd67f..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -/** - * Signals that supplied JSON for a document/operation is invalid - * - * @author bjorncs - */ -public class OperationParseException extends FeedException { - - public OperationParseException(String message) { super(message); } - - public OperationParseException(String message, Throwable cause) { super(message, cause); } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java deleted file mode 100644 index ab2faf245d8..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Statistics for feed operations over HTTP against a Vespa cluster. - * - * @author jonmv - */ -public class OperationStats { - - private final long requests; - private final Map<Integer, Long> responsesByCode; - private final long inflight; - private final long exceptions; - private final long averageLatencyMillis; - private final long minLatencyMillis; - private final long maxLatencyMillis; - private final long bytesSent; - private final long bytesReceived; - - public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, - long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, - long bytesSent, long bytesReceived) { - this.requests = requests; - this.responsesByCode = responsesByCode; - this.exceptions = exceptions; - this.inflight = inflight; - this.averageLatencyMillis = averageLatencyMillis; - this.minLatencyMillis = minLatencyMillis; - this.maxLatencyMillis = maxLatencyMillis; - this.bytesSent = bytesSent; - this.bytesReceived = bytesReceived; - } - - /** Returns the difference between this and the initial. Min and max latency are not modified. */ - public OperationStats since(OperationStats initial) { - return new OperationStats(requests - initial.requests, - responsesByCode.entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey(), - entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))), - exceptions - initial.exceptions, - inflight - initial.inflight, - responsesByCode.size() == initial.responsesByCode.size() ? 0 : - (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size()) - / (responsesByCode.size() - initial.responsesByCode.size()), - minLatencyMillis, - maxLatencyMillis, - bytesSent - initial.bytesSent, - bytesReceived - initial.bytesReceived); - } - - /** Number of HTTP requests attempted. */ - public long requests() { - return requests; - } - - /** Number of HTTP responses received. */ - public long responses() { - return requests - inflight - exceptions; - } - - /** Number of 200 OK HTTP responses received. */ - public long successes() { - return responsesByCode.getOrDefault(200, 0L); - } - - /** Number of HTTP responses by status code. */ - public Map<Integer, Long> responsesByCode() { - return responsesByCode; - } - - /** Number of exceptions (instead of responses). */ - public long exceptions() { - return exceptions; - } - - /** Number of attempted requests which haven't yielded a response or exception yet. */ - public long inflight() { - return inflight; - } - - /** Average request-response latency, or -1. */ - public long averageLatencyMillis() { - return averageLatencyMillis; - } - - /** Minimum request-response latency, or -1. */ - public long minLatencyMillis() { - return minLatencyMillis; - } - - /** Maximum request-response latency, or -1. */ - public long maxLatencyMillis() { - return maxLatencyMillis; - } - - /** Number of bytes sent, for HTTP requests with a response. */ - public long bytesSent() { - return bytesSent; - } - - /** Number of bytes received in HTTP responses. */ - public long bytesReceived() { - return bytesReceived; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OperationStats that = (OperationStats) o; - return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode); - } - - @Override - public int hashCode() { - return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived); - } - - @Override - public String toString() { - return "Stats{" + - "requests=" + requests + - ", responsesByCode=" + responsesByCode + - ", exceptions=" + exceptions + - ", inflight=" + inflight + - ", averageLatencyMillis=" + averageLatencyMillis + - ", minLatencyMillis=" + minLatencyMillis + - ", maxLatencyMillis=" + maxLatencyMillis + - ", bytesSent=" + bytesSent + - ", bytesReceived=" + bytesReceived + - '}'; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java deleted file mode 100644 index d9eaff40d74..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Optional; - -/** - * Signals that the document API in the feed container returned a failure result for a feed operation. - * - * @author jonmv - */ -public class ResultException extends FeedException { - - private final String trace; - - public ResultException(DocumentId documentId, String message, String trace) { - super(documentId, message); - this.trace = trace; - } - - /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */ - public Optional<String> getTrace() { - return Optional.ofNullable(trace); - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java deleted file mode 100644 index 947ab9f0560..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -/** - * Signals that the client was unable to obtain a proper response/result from container - * - * @author bjorncs - */ -public class ResultParseException extends FeedException { - - public ResultParseException(DocumentId documentId, String message) { super(documentId, message); } - - public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); } -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 52d7af2fb31..6dc9ec4efb1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.HttpResponse; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.RequestConfig; @@ -18,7 +19,6 @@ import org.apache.hc.core5.util.Timeout; import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -43,7 +43,7 @@ class ApacheCluster implements Cluster { .setResponseTimeout(Timeout.ofMinutes(5)) .build(); - ApacheCluster(FeedClientBuilder builder) throws IOException { + ApacheCluster(FeedClientBuilderImpl builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); @@ -114,7 +114,7 @@ class ApacheCluster implements Cluster { } - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilderImpl builder) throws IOException { SSLContext sslContext = builder.constructSslContext(); String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); if (allowedCiphers.length == 0) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java index 05ff6e99308..40049bad217 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.util.HashMap; import java.util.Map; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java index 57c028426fe..ee9188fdc2b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java @@ -1,8 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.io.Closeable; -import java.util.Collections; import java.util.concurrent.CompletableFuture; /** diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java index 282e4e14285..96cf7998681 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.nio.charset.StandardCharsets; import java.time.Duration; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index a379a8b066b..5969fe267c0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -1,7 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -25,7 +26,7 @@ public class DynamicThrottler extends StaticThrottler { private long startNanos = System.nanoTime(); private long sent = 0; - public DynamicThrottler(FeedClientBuilder builder) { + public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); targetInflight = new AtomicLong(8 * minInflight); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 3b79d47b494..7dafeb0b541 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; @@ -16,6 +19,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -26,11 +30,11 @@ import static java.util.Objects.requireNonNull; * @author bjorncs * @author jonmv */ -public class FeedClientBuilder { +public class FeedClientBuilderImpl implements FeedClientBuilder { static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; - final List<URI> endpoints; + List<URI> endpoints; final Map<String, Supplier<String>> requestHeaders = new HashMap<>(); SSLContext sslContext; HostnameVerifier hostnameVerifier; @@ -47,72 +51,65 @@ public class FeedClientBuilder { boolean benchmark = true; boolean dryrun = false; - /** Creates a builder for a single container endpoint **/ - public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } - /** Creates a builder for multiple container endpoints **/ - public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); } - private FeedClientBuilder(List<URI> endpoints) { + public FeedClientBuilderImpl() { + } + + FeedClientBuilderImpl(List<URI> endpoints) { + this(); + setEndpointUris(endpoints); + } + + @Override + public FeedClientBuilder setEndpointUris(List<URI> endpoints) { if (endpoints.isEmpty()) throw new IllegalArgumentException("At least one endpoint must be provided"); for (URI endpoint : endpoints) requireNonNull(endpoint.getHost()); - this.endpoints = new ArrayList<>(endpoints); + return this; } - /** - * Sets the number of connections this client will use per endpoint. - * - * A reasonable value here is a value that lets all feed clients (if more than one) - * collectively have a number of connections which is a small multiple of the numbers - * of containers in the cluster to feed, so load can be balanced across these containers. - * In general, this value should be kept as low as possible, but poor connectivity - * between feeder and cluster may also warrant a higher number of connections. - */ - public FeedClientBuilder setConnectionsPerEndpoint(int max) { + @Override + public FeedClientBuilderImpl setConnectionsPerEndpoint(int max) { if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max); this.connectionsPerEndpoint = max; return this; } - /** - * Sets the maximum number of streams per HTTP/2 connection for this client. - * - * This determines the maximum number of concurrent, inflight requests for this client, - * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over - * more connections, when possible. - * The feed client automatically throttles load to achieve the best throughput, and the - * actual number of streams per connection is usually lower than the maximum. - */ - public FeedClientBuilder setMaxStreamPerConnection(int max) { + @Override + public FeedClientBuilderImpl setMaxStreamPerConnection(int max) { if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max); this.maxStreamsPerConnection = max; return this; } /** Sets {@link SSLContext} instance. */ - public FeedClientBuilder setSslContext(SSLContext context) { + @Override + public FeedClientBuilderImpl setSslContext(SSLContext context) { this.sslContext = requireNonNull(context); return this; } /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */ - public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { + @Override + public FeedClientBuilderImpl setHostnameVerifier(HostnameVerifier verifier) { this.hostnameVerifier = requireNonNull(verifier); return this; } /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */ - public FeedClientBuilder noBenchmarking() { + @Override + public FeedClientBuilderImpl noBenchmarking() { this.benchmark = false; return this; } /** Adds HTTP request header to all client requests. */ - public FeedClientBuilder addRequestHeader(String name, String value) { + @Override + public FeedClientBuilderImpl addRequestHeader(String name, String value) { return addRequestHeader(name, () -> requireNonNull(value)); } @@ -120,7 +117,8 @@ public class FeedClientBuilder { * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request, * i.e. value can be dynamically updated during a feed. */ - public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) { + @Override + public FeedClientBuilderImpl addRequestHeader(String name, Supplier<String> valueSupplier) { this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier)); return this; } @@ -129,7 +127,8 @@ public class FeedClientBuilder { * Overrides default retry strategy. * @see FeedClient.RetryStrategy */ - public FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy) { + @Override + public FeedClientBuilderImpl setRetryStrategy(FeedClient.RetryStrategy strategy) { this.retryStrategy = requireNonNull(strategy); return this; } @@ -138,31 +137,36 @@ public class FeedClientBuilder { * Overrides default circuit breaker. * @see FeedClient.CircuitBreaker */ - public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) { + @Override + public FeedClientBuilderImpl setCircuitBreaker(FeedClient.CircuitBreaker breaker) { this.circuitBreaker = requireNonNull(breaker); return this; } /** Sets path to client SSL certificate/key PEM files */ - public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { + @Override + public FeedClientBuilderImpl setCertificate(Path certificatePemFile, Path privateKeyPemFile) { this.certificateFile = certificatePemFile; this.privateKeyFile = privateKeyPemFile; return this; } /** Sets client SSL certificates/key */ - public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { + @Override + public FeedClientBuilderImpl setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { this.certificate = certificate; this.privateKey = privateKey; return this; } /** Sets client SSL certificate/key */ - public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) { + @Override + public FeedClientBuilderImpl setCertificate(X509Certificate certificate, PrivateKey privateKey) { return setCertificate(Collections.singletonList(certificate), privateKey); } - public FeedClientBuilder setDryrun(boolean enabled) { + @Override + public FeedClientBuilderImpl setDryrun(boolean enabled) { this.dryrun = enabled; return this; } @@ -171,18 +175,21 @@ public class FeedClientBuilder { * Overrides JVM default SSL truststore * @param caCertificatesFile Path to PEM encoded file containing trusted certificates */ - public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) { + @Override + public FeedClientBuilderImpl setCaCertificatesFile(Path caCertificatesFile) { this.caCertificatesFile = caCertificatesFile; return this; } /** Overrides JVM default SSL truststore */ - public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) { + @Override + public FeedClientBuilderImpl setCaCertificates(Collection<X509Certificate> caCertificates) { this.caCertificates = caCertificates; return this; } /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ + @Override public FeedClient build() { try { validateConfiguration(); @@ -209,6 +216,9 @@ public class FeedClientBuilder { } private void validateConfiguration() { + if (endpoints == null) { + throw new IllegalArgumentException("At least one endpoint must be provided"); + } if (sslContext != null && ( certificateFile != null || caCertificatesFile != null || privateKeyFile != null || certificate != null || caCertificates != null || privateKey != null)) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index cb5e35c79a5..b223fce7cab 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.HttpResponse; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index eb818ba1d48..3fd44596d63 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -1,6 +1,15 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; +import ai.vespa.feed.client.ResultException; +import ai.vespa.feed.client.ResultParseException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; @@ -33,11 +42,11 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); - HttpFeedClient(FeedClientBuilder builder) throws IOException { + HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { this(builder, new HttpRequestStrategy(builder)); } - HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, RequestStrategy requestStrategy) { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; } @@ -173,7 +182,7 @@ class HttpFeedClient implements FeedClient { if (outcome == Outcome.vespaFailure) throw new ResultException(documentId, message, trace); - return new Result(toResultType(outcome), documentId, message, trace); + return new ResultImpl(toResultType(outcome), documentId, message, trace); } static String getPath(DocumentId documentId) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 48defd71ea8..08b8ca08c61 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import java.util.Map; import java.util.function.Supplier; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index cf65a874f3b..6fec0029bc3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -1,8 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.HttpResponse ; +import ai.vespa.feed.client.OperationStats; import java.io.IOException; import java.nio.channels.CancelledKeyException; @@ -62,11 +67,11 @@ class HttpRequestStrategy implements RequestStrategy { return thread; }); - HttpRequestStrategy(FeedClientBuilder builder) throws IOException { + HttpRequestStrategy(FeedClientBuilderImpl builder) throws IOException { this(builder, builder.dryrun ? new DryrunCluster() : new ApacheCluster(builder)); } - HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java index 9a97f7daa66..e3b6b594593 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java @@ -1,7 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.util.concurrent.CompletableFuture; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java index 5ff3fd0a219..dabf76cba34 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.Result; import java.util.Optional; @@ -9,29 +12,24 @@ import java.util.Optional; * @author bjorncs * @author jonmv */ -public class Result { +public class ResultImpl implements Result { private final Type type; private final DocumentId documentId; private final String resultMessage; private final String traceMessage; - Result(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + ResultImpl(Type type, DocumentId documentId, String resultMessage, String traceMessage) { this.type = type; this.documentId = documentId; this.resultMessage = resultMessage; this.traceMessage = traceMessage; } - public enum Type { - success, - conditionNotMet - } - - public Type type() { return type; } - public DocumentId documentId() { return documentId; } - public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); } - public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); } + @Override public Type type() { return type; } + @Override public DocumentId documentId() { return documentId; } + @Override public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); } + @Override public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); } @Override public String toString() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java index f5e13eccd56..2ca4577abe6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import org.bouncycastle.asn1.ASN1ObjectIdentifier; import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 5137a18d923..1f9cf8e5155 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -18,7 +20,7 @@ public class StaticThrottler implements Throttler { protected final long minInflight; private final AtomicLong targetX10; - public StaticThrottler(FeedClientBuilder builder) { + public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 16L * builder.connectionsPerEndpoint * builder.endpoints.size(); maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java index f2453c27879..700a6f6f805 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.util.concurrent.CompletableFuture; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java deleted file mode 100644 index daab16a9ff2..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @author bjorncs - */ - -@PublicApi -package ai.vespa.feed.client; - -import com.yahoo.api.annotations.PublicApi;
\ No newline at end of file diff --git a/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder b/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder new file mode 100644 index 00000000000..b6e28b1806c --- /dev/null +++ b/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder @@ -0,0 +1,2 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +ai.vespa.feed.client.impl.FeedClientBuilderImpl
\ No newline at end of file diff --git a/vespa-feed-client/src/main/sh/vespa-version-generator.sh b/vespa-feed-client/src/main/sh/vespa-version-generator.sh index 5aafb3e2bf7..44fb7d167db 100755 --- a/vespa-feed-client/src/main/sh/vespa-version-generator.sh +++ b/vespa-feed-client/src/main/sh/vespa-version-generator.sh @@ -16,7 +16,7 @@ mkdir -p $destinationDir versionNumber=$(cat $source | grep V_TAG_COMPONENT | awk '{print $2}' ) cat > $destination <<- END -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; class Vespa { static final String VERSION = "$versionNumber"; diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java deleted file mode 100644 index e4fb5cb5bef..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonFeederTest.java +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import org.junit.jupiter.api.Test; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.joining; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -class JsonFeederTest { - - @Test - void test() throws IOException { - int docs = 1 << 14; - String json = "[\n" + - - IntStream.range(0, docs).mapToObj(i -> - " {\n" + - " \"id\": \"id:ns:type::abc" + i + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " },\n" - ).collect(joining()) + - - " {\n" + - " \"id\": \"id:ns:type::abc" + docs + "\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " }\n" + - "]"; - AtomicReference<FeedException> exceptionThrow = new AtomicReference<>(); - Path tmpFile = Files.createTempFile(null, null); - Files.write(tmpFile, json.getBytes(UTF_8)); - try (InputStream in = Files.newInputStream(tmpFile, StandardOpenOption.READ, StandardOpenOption.DELETE_ON_CLOSE)) { - AtomicInteger resultsReceived = new AtomicInteger(); - AtomicBoolean completedSuccessfully = new AtomicBoolean(); - long startNanos = System.nanoTime(); - MockClient feedClient = new MockClient(); - JsonFeeder.builder(feedClient).build() - .feedMany(in, 1 << 10, - new JsonFeeder.ResultCallback() { - @Override - public void onNextResult(Result result, FeedException error) { resultsReceived.incrementAndGet(); } - - @Override - public void onError(FeedException error) { exceptionThrow.set(error); } - - @Override - public void onComplete() { completedSuccessfully.set(true); } - }) - .join(); - - System.err.println((json.length() / 1048576.0) + " MB in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); - assertEquals(docs + 1, feedClient.putOperations.size()); - assertEquals(docs + 1, resultsReceived.get()); - assertTrue(completedSuccessfully.get()); - assertNull(exceptionThrow.get()); - } - } - - @Test - public void multipleJsonArrayOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { - MockClient client = new MockClient(); - try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { - String json = "[{" + - " \"put\": \"id:ns:type::abc1\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - "},\n" + - "{" + - " \"put\": \"id:ns:type::abc2\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - "}]\n"; - feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8))).get(); - client.assertPutDocumentIds("abc1", "abc2"); - client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); - client.assertPutOperation("abc2", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); - } - } - - @Test - public void multipleJsonLOperationsAreDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { - MockClient client = new MockClient(); - try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { - String json = "{\n" + - " \"remove\": \"id:ns:type::abc1\"\n" + - "}\n" + - "{\n" + - " \"fields\": {\n" + - " \"lul\": { \"assign\": \"lal\" }\n" + - " },\n" + - " \"update\": \"id:ns:type::abc2\"\n" + - "}\n" + - "{\n" + - " \"put\": \"id:ns:type::abc3\",\n" + - " \"fields\": {\n" + - " \"lul\": \"lal\"\n" + - " }\n" + - "}\n"; - - feeder.feedMany(new ByteArrayInputStream(json.getBytes(UTF_8)), - 3, // Mini-buffer, which needs to expand. - new JsonFeeder.ResultCallback() { }) - .get(); - client.assertRemoveDocumentIds("abc1"); - client.assertUpdateDocumentIds("abc2"); - client.assertUpdateOperation("abc2", "{\"fields\":{\n \"lul\": { \"assign\": \"lal\" }\n }}"); - client.assertPutDocumentIds("abc3"); - client.assertPutOperation("abc3", "{\"fields\":{\n \"lul\": \"lal\"\n }}"); - } - } - - @Test - public void singleJsonOperationIsDispatchedToFeedClient() throws IOException, ExecutionException, InterruptedException { - MockClient client = new MockClient(); - try (JsonFeeder feeder = JsonFeeder.builder(client).build()) { - String json = "{\"put\": \"id:ns:type::abc1\",\n" + - " \"fields\": {\n" + - " \"lul\":\"lal\"\n" + - " }\n" + - " }\n"; - Result result = feeder.feedSingle(json).get(); - assertEquals(DocumentId.of("id:ns:type::abc1"), result.documentId()); - assertEquals(Result.Type.success, result.type()); - assertEquals("success", result.resultMessage().get()); - client.assertPutOperation("abc1", "{\"fields\":{\n \"lul\":\"lal\"\n }}"); - } - } - - private static class MockClient implements FeedClient { - final Map<DocumentId, String> putOperations = new LinkedHashMap<>(); - final Map<DocumentId, String> updateOperations = new LinkedHashMap<>(); - final Map<DocumentId, String> removeOperations = new LinkedHashMap<>(); - - @Override - public CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params) { - putOperations.put(documentId, documentJson); - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params) { - updateOperations.put(documentId, updateJson); - return createSuccessResult(documentId); - } - - @Override - public CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params) { - removeOperations.put(documentId, null); - return createSuccessResult(documentId); - } - - @Override - public OperationStats stats() { return null; } - - @Override - public CircuitBreaker.State circuitBreakerState() { return null; } - - @Override - public void close(boolean graceful) { } - - private CompletableFuture<Result> createSuccessResult(DocumentId documentId) { - return CompletableFuture.completedFuture(new Result(Result.Type.success, documentId, "success", null)); - } - - void assertDocumentIds(Collection<DocumentId> keys, String... expectedUserSpecificIds) { - List<String> expected = Arrays.stream(expectedUserSpecificIds) - .map(userSpecific -> "id:ns:type::" + userSpecific) - .sorted() - .collect(Collectors.toList()); - List<String> actual = keys.stream() - .map(DocumentId::toString).sorted() - .collect(Collectors.toList()); - assertEquals(expected, actual, "Document ids must match"); - } - - void assertPutDocumentIds(String... expectedUserSpecificIds) { - assertDocumentIds(putOperations.keySet(), expectedUserSpecificIds); - } - - void assertUpdateDocumentIds(String... expectedUserSpecificIds) { - assertDocumentIds(updateOperations.keySet(), expectedUserSpecificIds); - } - - void assertRemoveDocumentIds(String... expectedUserSpecificIds) { - assertDocumentIds(removeOperations.keySet(), expectedUserSpecificIds); - } - - void assertPutOperation(String userSpecificId, String expectedJson) { - DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); - String json = putOperations.get(docId); - assertNotNull(json); - assertEquals(expectedJson.trim(), json.trim()); - } - - void assertUpdateOperation(String userSpecificId, String expectedJson) { - DocumentId docId = DocumentId.of("id:ns:type::" + userSpecificId); - String json = updateOperations.get(docId); - assertNotNull(json); - assertEquals(expectedJson.trim(), json.trim()); - } - - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java deleted file mode 100644 index b951fb62fb5..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonFileFeederExample.java +++ /dev/null @@ -1,90 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.examples; - -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.FeedClient; -import ai.vespa.feed.client.FeedClientBuilder; -import ai.vespa.feed.client.FeedException; -import ai.vespa.feed.client.JsonFeeder; -import ai.vespa.feed.client.Result; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; - -/** - * Sample feeder demonstrating how to programmatically feed to a Vespa cluster. - */ -class JsonFileFeederExample implements Closeable { - - private final static Logger log = Logger.getLogger(JsonFileFeederExample.class.getName()); - - private final JsonFeeder jsonFeeder; - private final URI endpoint; - - static class ResultCallBack implements JsonFeeder.ResultCallback { - - final AtomicInteger resultsReceived = new AtomicInteger(0); - final AtomicInteger errorsReceived = new AtomicInteger(0); - final long startTimeMillis = System.currentTimeMillis();; - - @Override - public void onNextResult(Result result, FeedException error) { - resultsReceived.incrementAndGet(); - if (error != null) { - log.warning("Problems with feeding document " - + error.documentId().map(DocumentId::toString).orElse("<unknown>") - + ": " + error); - errorsReceived.incrementAndGet(); - } - } - - @Override - public void onError(FeedException error) { - log.severe("Feeding failed fatally: " + error.getMessage()); - } - - @Override - public void onComplete() { - log.info("Feeding completed"); - } - - void dumpStatsToLog() { - log.info("Received in total " + resultsReceived.get() + ", " + errorsReceived.get() + " errors."); - log.info("Time spent receiving is " + (System.currentTimeMillis() - startTimeMillis) + " ms."); - } - - } - - JsonFileFeederExample(URI endpoint) { - this.endpoint = endpoint; - FeedClient feedClient = FeedClientBuilder.create(endpoint) - .build(); - this.jsonFeeder = JsonFeeder.builder(feedClient) - .withTimeout(Duration.ofSeconds(30)) - .build(); - } - - /** - * Feed all operations from a stream. - * - * @param stream The input stream to read operations from (JSON array containing one or more document operations). - */ - void batchFeed(InputStream stream, String batchId) { - ResultCallBack callback = new ResultCallBack(); - log.info("Starting feed to " + endpoint + " for batch '" + batchId + "'"); - CompletableFuture<Void> promise = jsonFeeder.feedMany(stream, callback); - promise.join(); // wait for feeding to complete - callback.dumpStatsToLog(); - } - - @Override - public void close() throws IOException { - jsonFeeder.close(); - } -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java deleted file mode 100644 index 3d4ce150fcf..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/JsonStreamFeederExample.java +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.examples; - -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.FeedClient; -import ai.vespa.feed.client.FeedClientBuilder; -import ai.vespa.feed.client.OperationParameters; -import ai.vespa.feed.client.Result; - -import java.net.URI; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Simple Streaming feeder implementation which will send operations to a Vespa endpoint. - * Other threads communicate with the feeder by adding new operations on the BlockingQueue - */ - -class JsonStreamFeederExample extends Thread implements AutoCloseable { - - static class Operation { - final String type; - final String documentId; - final String documentFieldsJson; - - Operation(String type, String id, String fields) { - this.type = type; - this.documentId = id; - this.documentFieldsJson = fields; - } - } - - private final static Logger log = Logger.getLogger(JsonStreamFeederExample.class.getName()); - - private final BlockingQueue<Operation> operations; - private final FeedClient feedClient; - private final AtomicBoolean drain = new AtomicBoolean(false); - private final CountDownLatch finishedDraining = new CountDownLatch(1); - private final AtomicInteger resultCounter = new AtomicInteger(); - - /** - * Constructor - * @param operations The shared blocking queue where other threads can put document operations to. - * @param endpoint The endpoint to feed to - */ - JsonStreamFeederExample(BlockingQueue<JsonStreamFeederExample.Operation> operations, URI endpoint) { - this.operations = operations; - this.feedClient = FeedClientBuilder.create(endpoint).build(); - } - - /** - * Shutdown this feeder, waits until operations on queue is drained - */ - @Override - public void close() { - log.info("Shutdown initiated, awaiting operations queue to be drained. Queue size is " + operations.size()); - drain.set(true); - try { - finishedDraining.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - @Override - public void run() { - while (!drain.get() || !operations.isEmpty()) { - try { - JsonStreamFeederExample.Operation op = operations.poll(1, TimeUnit.SECONDS); - if(op == null) // no operations available - continue; - log.info("Put document " + op.documentId); - CompletableFuture<Result> promise; - DocumentId docId = DocumentId.of(op.documentId); - OperationParameters params = OperationParameters.empty(); - String json = op.documentFieldsJson; - switch (op.type) { - case "put": - promise = feedClient.put(docId, json, params); - break; - case "remove": - promise = feedClient.remove(docId, params); - break; - case "update": - promise = feedClient.update(docId, json, params); - break; - default: - throw new IllegalArgumentException("Invalid operation: " + op.type); - } - promise.whenComplete((result, throwable) -> { - if (resultCounter.getAndIncrement() % 10 == 0) { - System.err.println(feedClient.stats()); - } - if (throwable != null) { - System.err.printf("Failure for '%s': %s", docId, throwable); - throwable.printStackTrace(); - } - }); - } catch (InterruptedException e) { - log.log(Level.SEVERE, "Got interrupt exception.", e); - break; - } - } - log.info("Shutting down feeding thread"); - this.feedClient.close(); - finishedDraining.countDown(); - } - -}
\ No newline at end of file diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java deleted file mode 100644 index 4e6473a6568..00000000000 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/examples/SimpleExample.java +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client.examples; - -import ai.vespa.feed.client.DocumentId; -import ai.vespa.feed.client.FeedClient; -import ai.vespa.feed.client.FeedClientBuilder; -import ai.vespa.feed.client.OperationParameters; -import ai.vespa.feed.client.Result; - -import java.net.URI; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; - -class SimpleExample { - - public static void main(String[] args) { - try (FeedClient client = FeedClientBuilder.create(URI.create("https://my-container-endpoint-with-http2:8080/")).build()) { - DocumentId id = DocumentId.of("namespace", "documenttype", "1"); - String json = "{\"fields\": {\"title\": \"hello world\"}}"; - OperationParameters params = OperationParameters.empty() - .timeout(Duration.ofSeconds(5)) - .route("myvesparoute"); - CompletableFuture<Result> promise = client.put(id, json, params); - promise.whenComplete(((result, throwable) -> { - if (throwable != null) { - throwable.printStackTrace(); - } else { - System.out.printf("'%s' for document '%s': %s%n", result.type(), result.documentId(), result.resultMessage()); - } - })); - } - } - -} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/DocumentIdTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DocumentIdTest.java index df790056309..61526b80fe7 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/DocumentIdTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DocumentIdTest.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -14,8 +16,8 @@ class DocumentIdTest { @Test void testParsing() { - assertEquals("id:ns:type::user", - DocumentId.of("id:ns:type::user").toString()); + Assertions.assertEquals("id:ns:type::user", + DocumentId.of("id:ns:type::user").toString()); assertEquals("id:ns:type:n=123:user", DocumentId.of("id:ns:type:n=123:user").toString()); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java index 8eaffc3e9be..b7dac5ce52e 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import org.junit.jupiter.api.Test; diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index d92958a5838..5353ab92fb6 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -1,10 +1,19 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; +import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; import java.net.URI; import java.time.Duration; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -33,7 +42,7 @@ class HttpFeedClientTest { @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } - FeedClient client = new HttpFeedClient(FeedClientBuilder.create(URI.create("https://dummy:123")), new MockRequestStrategy()); + FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy:123"))), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. dispatch.set((documentId, request) -> { diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 0f840201ca8..d293abf4f3e 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -1,19 +1,23 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClient.CircuitBreaker; -import org.apache.hc.core5.http.ContentType; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; @@ -42,7 +46,7 @@ class HttpRequestStrategyTest { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS)); - HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), cluster); @@ -82,7 +86,7 @@ class HttpRequestStrategyTest { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); - HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } @Override public int retries() { return 1; } @@ -189,7 +193,7 @@ class HttpRequestStrategyTest { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); - HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } }) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/SslContextBuilderTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/SslContextBuilderTest.java index a74f63f5cd2..f7c1b4d2b03 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/SslContextBuilderTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/SslContextBuilderTest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; @@ -8,6 +8,7 @@ import org.bouncycastle.operator.ContentSigner; import org.bouncycastle.operator.OperatorCreationException; import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; import org.bouncycastle.util.io.pem.PemObject; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -52,7 +53,7 @@ class SslContextBuilderTest { @Test void successfully_constructs_sslcontext_from_pem_files() { - SSLContext sslContext = assertDoesNotThrow(() -> + SSLContext sslContext = Assertions.assertDoesNotThrow(() -> new SslContextBuilder() .withCaCertificates(certificateFile) .withCertificateAndKey(certificateFile, privateKeyFile) @@ -62,13 +63,13 @@ class SslContextBuilderTest { @Test void successfully_constructs_sslcontext_when_no_builder_parameter_given() { - SSLContext sslContext = assertDoesNotThrow(() -> new SslContextBuilder().build()); + SSLContext sslContext = Assertions.assertDoesNotThrow(() -> new SslContextBuilder().build()); assertEquals("TLS", sslContext.getProtocol()); } @Test void successfully_constructs_sslcontext_with_only_certificate_file() { - SSLContext sslContext = assertDoesNotThrow(() -> + SSLContext sslContext = Assertions.assertDoesNotThrow(() -> new SslContextBuilder() .withCertificateAndKey(certificateFile, privateKeyFile) .build()); @@ -77,7 +78,7 @@ class SslContextBuilderTest { @Test void successfully_constructs_sslcontext_with_only_ca_certificate_file() { - SSLContext sslContext = assertDoesNotThrow(() -> + SSLContext sslContext = Assertions.assertDoesNotThrow(() -> new SslContextBuilder() .withCaCertificates(certificateFile) .build()); |