diff options
author | Morten Tokle <mortent@verizonmedia.com> | 2021-12-07 12:52:42 +0100 |
---|---|---|
committer | Morten Tokle <mortent@verizonmedia.com> | 2021-12-07 12:52:42 +0100 |
commit | 5e956429169d3a733114e5f76f051167f291c786 (patch) | |
tree | fa2b9cc664c8c639482397e9a4566149dac3ae29 /vespa-feed-client/src/main | |
parent | ae09069f544a086af4ae02a092ec66788a3cae9e (diff) |
Extract vespa-feed-client-api module from vespa-feed-client
Diffstat (limited to 'vespa-feed-client/src/main')
28 files changed, 123 insertions, 1179 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java deleted file mode 100644 index 5474bcfda01..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Objects; -import java.util.Optional; -import java.util.OptionalLong; - -import static java.util.Objects.requireNonNull; - -/** - * Represents a Vespa document id - * - * @author jonmv - */ -public class DocumentId { - - private final String documentType; - private final String namespace; - private final OptionalLong number; - private final Optional<String> group; - private final String userSpecific; - - private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) { - this.documentType = requireNonNull(documentType); - this.namespace = requireNonNull(namespace); - this.number = requireNonNull(number); - this.group = requireNonNull(group); - this.userSpecific = requireNonNull(userSpecific); - } - - public static DocumentId of(String namespace, String documentType, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific); - } - - public static DocumentId of(String namespace, String documentType, long number, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific); - } - - public static DocumentId of(String namespace, String documentType, String group, String userSpecific) { - return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific); - } - - public static DocumentId of(String serialized) { - DocumentId parsed = parse(serialized); - if (parsed != null) return parsed; - throw new IllegalArgumentException("Document ID must be on the form " + - "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " + - "but was '" + serialized + "'"); - } - - private static DocumentId parse(String serialized) { - int i, j = -1; - if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; - if ( ! "id".equals(serialized.substring(i, j))) return null; - if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; - String namespace = serialized.substring(i, j); - if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null; - String documentType = serialized.substring(i, j); - if ((j = serialized.indexOf(':', i = j + 1)) < i) return null; - String group = serialized.substring(i, j); - if (serialized.length() <= (i = j + 1)) return null; - String userSpecific = serialized.substring(i); - if (group.startsWith("n=") && group.length() > 2) - return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific); - if (group.startsWith("g=") && group.length() > 2) - return DocumentId.of(namespace, documentType, group.substring(2), userSpecific); - if (group.isEmpty()) - return DocumentId.of(namespace, documentType, userSpecific); - return null; - } - - public String documentType() { - return documentType; - } - - public String namespace() { - return namespace; - } - - public OptionalLong number() { - return number; - } - - public Optional<String> group() { - return group; - } - - public String userSpecific() { - return userSpecific; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - DocumentId that = (DocumentId) o; - return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific); - } - - @Override - public int hashCode() { - return Objects.hash(documentType, namespace, number, group, userSpecific); - } - - @Override - public String toString() { - return "id:" + namespace + ":" + documentType + ":" + - (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) + - ":" + userSpecific; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java deleted file mode 100644 index d463c611d6a..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.io.Closeable; -import java.util.concurrent.CompletableFuture; - -/** - * Asynchronous feed client accepting document operations as JSON. The payload should be - * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre> - * { - * "fields": { - * ... - * } - * } - * </pre> - * - * @author bjorncs - * @author jonmv - */ -public interface FeedClient extends Closeable { - - /** - * Send a document put with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params); - - /** - * Send a document update with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params); - - /** - * Send a document remove with the given parameters, returning a future with the result of the operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params); - - /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ - OperationStats stats(); - - /** Current state of the circuit breaker. */ - CircuitBreaker.State circuitBreakerState(); - - /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ - void close(boolean graceful); - - /** Initiates graceful shutdown. See {@link #close(boolean)}. */ - default void close() { close(true); } - - /** Controls what to retry, and how many times. */ - interface RetryStrategy { - - /** Whether to retry operations of the given type. */ - default boolean retry(OperationType type) { return true; } - - /** Number of retries per operation for assumed transient, non-backpressure problems. */ - default int retries() { return 10; } - - } - - /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ - interface CircuitBreaker { - - /** A circuit breaker which is always closed. */ - CircuitBreaker FUSED = () -> State.CLOSED; - - /** Called by the client whenever a successful response is obtained. */ - default void success() { } - - /** Called by the client whenever an error HTTP response is received. */ - default void failure(HttpResponse response) { } - - /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */ - default void failure(Throwable cause) { } - - /** The current state of the circuit breaker. */ - State state(); - - enum State { - - /** Circuit is closed: business as usual. */ - CLOSED, - - /** Circuit is half-open: something is wrong, perhaps it recovers? */ - HALF_OPEN, - - /** Circuit is open: we have given up. */ - OPEN; - - } - - } - - enum OperationType { - - /** A document put operation. This is idempotent. */ - PUT, - - /** A document update operation. This is idempotent if all its contained updates are. */ - UPDATE, - - /** A document remove operation. This is idempotent. */ - REMOVE; - - } - - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java deleted file mode 100644 index 1936eb09418..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Optional; - -/** - * Signals that an error occurred during feeding - * - * @author bjorncs - */ -public class FeedException extends RuntimeException { - - private final DocumentId documentId; - - public FeedException(String message) { - super(message); - this.documentId = null; - } - - public FeedException(DocumentId documentId, String message) { - super(message); - this.documentId = documentId; - } - - public FeedException(String message, Throwable cause) { - super(message, cause); - this.documentId = null; - } - - public FeedException(Throwable cause) { - super(cause); - this.documentId = null; - } - - public FeedException(DocumentId documentId, Throwable cause) { - super(cause); - this.documentId = documentId; - } - - public FeedException(DocumentId documentId, String message, Throwable cause) { - super(message, cause); - this.documentId = documentId; - } - - public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java deleted file mode 100644 index 07fdb2d7257..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -interface HttpResponse { - - int code(); - byte[] body(); - - static HttpResponse of(int code, byte[] body) { - return new HttpResponse() { - @Override public int code() { return code; } - @Override public byte[] body() { return body; } - }; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java deleted file mode 100644 index 2d7caea9f26..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java +++ /dev/null @@ -1,514 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import ai.vespa.feed.client.FeedClient.OperationType; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonLocation; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static ai.vespa.feed.client.FeedClient.OperationType.PUT; -import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; -import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; -import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; -import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE; -import static java.lang.Math.min; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * @author jonmv - * @author bjorncs - */ -public class JsonFeeder implements Closeable { - - private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> { - Thread t = new Thread(r, "json-feeder-result-executor"); - t.setDaemon(true); - return t; - }); - private final FeedClient client; - private final OperationParameters protoParameters; - - private JsonFeeder(FeedClient client, OperationParameters protoParameters) { - this.client = client; - this.protoParameters = protoParameters; - } - - public interface ResultCallback { - /** - * Invoked after each operation has either completed successfully or failed - * - * @param result Non-null if operation completed successfully - * @param error Non-null if operation failed - */ - default void onNextResult(Result result, FeedException error) { } - - /** - * Invoked if an unrecoverable error occurred during feed processing, - * after which no other {@link ResultCallback} methods are invoked. - */ - default void onError(FeedException error) { } - - /** - * Invoked when all feed operations are either completed successfully or failed. - */ - default void onComplete() { } - } - - public static Builder builder(FeedClient client) { return new Builder(client); } - - /** Feeds single JSON feed operations on the form - * <pre> - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * } - * </pre> - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Result> feedSingle(String json) { - CompletableFuture<Result> result = new CompletableFuture<>(); - try { - SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8)); - parser.next().whenCompleteAsync((operationResult, error) -> { - if (error != null) { - result.completeExceptionally(error); - } else { - result.complete(operationResult); - } - }, resultExecutor); - } catch (Exception e) { - resultExecutor.execute(() -> result.completeExceptionally(wrapException(e))); - } - return result; - } - - /** Feeds a stream containing a JSON array of feed operations on the form - * <pre> - * [ - * { - * "id": "id:ns:type::boo", - * "fields": { ... document fields ... } - * }, - * { - * "put": "id:ns:type::foo", - * "fields": { ... document fields ... } - * }, - * { - * "update": "id:ns:type:n=4:bar", - * "create": true, - * "fields": { ... partial update fields ... } - * }, - * { - * "remove": "id:ns:type:g=foo:bar", - * "condition": "type.baz = \"bax\"" - * }, - * ... - * ] - * </pre> - * Note that {@code "id"} is an alias for the document put operation. - * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) { - return feedMany(jsonStream, 1 << 26, resultCallback); - } - - /** - * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance. - * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details. - */ - public CompletableFuture<Void> feedMany(InputStream jsonStream) { - return feedMany(jsonStream, new ResultCallback() { }); - } - - CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) { - CompletableFuture<Void> overallResult = new CompletableFuture<>(); - CompletableFuture<Result> result; - AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation - AtomicBoolean finalCallbackInvoked = new AtomicBoolean(); - try { - RingBufferStream buffer = new RingBufferStream(jsonStream, size); - while ((result = buffer.next()) != null) { - pending.incrementAndGet(); - result.whenCompleteAsync((r, t) -> { - if (!finalCallbackInvoked.get()) { - resultCallback.onNextResult(r, (FeedException) t); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultCallback.onComplete(); - overallResult.complete(null); - } - }, resultExecutor); - } - if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - resultCallback.onComplete(); - overallResult.complete(null); - }); - } - } catch (Exception e) { - if (finalCallbackInvoked.compareAndSet(false, true)) { - resultExecutor.execute(() -> { - FeedException wrapped = wrapException(e); - resultCallback.onError(wrapped); - overallResult.completeExceptionally(wrapped); - }); - } - } - return overallResult; - } - - private static final JsonFactory factory = new JsonFactory(); - - @Override public void close() throws IOException { - client.close(); - resultExecutor.shutdown(); - try { - if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - throw new IOException("Failed to close client in time"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - private FeedException wrapException(Exception e) { - if (e instanceof FeedException) return (FeedException) e; - if (e instanceof IOException) { - return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e); - } - return new FeedException(e); - } - - private class RingBufferStream extends InputStream { - - private final byte[] b = new byte[1]; - private final InputStream in; - private final Object lock = new Object(); - private byte[] data; - private int size; - private IOException thrown = null; - private long tail = 0; - private long pos = 0; - private long head = 0; - private boolean done = false; - private final OperationParserAndExecutor parserAndExecutor; - - RingBufferStream(InputStream in, int size) throws IOException { - this.in = in; - this.data = new byte[size]; - this.size = size; - - new Thread(this::fill, "feed-reader").start(); - - this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this)); - } - - @Override - public int read() throws IOException { - return read(b, 0, 1) == -1 ? -1 : b[0]; - } - - @Override - public int read(byte[] buffer, int off, int len) throws IOException { - try { - int ready; - synchronized (lock) { - if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write. - expand(); - - while ((ready = (int) (head - pos)) == 0 && ! done) - lock.wait(); - } - if (thrown != null) throw thrown; - if (ready == 0) return -1; - - ready = min(ready, len); - int offset = (int) (pos % size); - int length = min(ready, size - offset); - System.arraycopy(data, offset, buffer, off, length); - if (length < ready) - System.arraycopy(data, 0, buffer, off + length, ready - length); - - pos += ready; - return ready; - } - catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage()); - } - } - - public CompletableFuture<Result> next() throws IOException { - return parserAndExecutor.next(); - } - - private void expand() { - int newSize = size * 2; - if (newSize <= size) - throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much"); - - byte[] newData = new byte[newSize]; - int offset = (int) (tail % size); - int newOffset = (int) (tail % newSize); - int toWrite = size - offset; - System.arraycopy(data, offset, newData, newOffset, toWrite); - if (toWrite < size) - System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite); - size = newSize; - data = newData; - lock.notify(); - } - - private final byte[] prefix = "{\"fields\":".getBytes(UTF_8); - private byte[] copy(long start, long end) { - int length = (int) (end - start); - byte[] buffer = new byte[prefix.length + length + 1]; - System.arraycopy(prefix, 0, buffer, 0, prefix.length); - - int offset = (int) (start % size); - int toWrite = min(length, size - offset); - System.arraycopy(data, offset, buffer, prefix.length, toWrite); - if (toWrite < length) - System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite); - - buffer[buffer.length - 1] = '}'; - return buffer; - } - - - @Override - public void close() throws IOException { - synchronized (lock) { - done = true; - lock.notifyAll(); - } - in.close(); - } - - private void fill() { - try { - while (true) { - int free; - synchronized (lock) { - while ((free = (int) (tail + size - head)) <= 0 && ! done) - lock.wait(); - } - if (done) break; - - int off = (int) (head % size); - int len = min(min(free, size - off), 1 << 13); - int read = in.read(data, off, len); - - synchronized (lock) { - if (read < 0) done = true; - else head += read; - lock.notify(); - } - } - } catch (InterruptedException e) { - synchronized (lock) { - done = true; - thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage()); - } - } catch (IOException e) { - synchronized (lock) { - done = true; - thrown = e; - } - } - } - - private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor { - - RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); } - - @Override - String getDocumentJson(long start, long end) { - String payload = new String(copy(start, end), UTF_8); - synchronized (lock) { - tail = end; - lock.notify(); - } - return payload; - } - } - } - - private class SingleOperationParserAndExecutor extends OperationParserAndExecutor { - - private final byte[] json; - - SingleOperationParserAndExecutor(byte[] json) throws IOException { - super(factory.createParser(json), false); - this.json = json; - } - - @Override - String getDocumentJson(long start, long end) { - return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}"; - } - } - - private abstract class OperationParserAndExecutor { - - private final JsonParser parser; - private final boolean multipleOperations; - private boolean arrayPrefixParsed; - - protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) { - this.parser = parser; - this.multipleOperations = multipleOperations; - } - - abstract String getDocumentJson(long start, long end); - - OperationParseException parseException(String error) { - JsonLocation location = parser.getTokenLocation(); - return new OperationParseException(error + " at offset " + location.getByteOffset() + - " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")"); - } - - CompletableFuture<Result> next() throws IOException { - JsonToken token = parser.nextToken(); - if (multipleOperations && ! arrayPrefixParsed && token == START_ARRAY) { - arrayPrefixParsed = true; - token = parser.nextToken(); - } - if (token == END_ARRAY && multipleOperations) return null; - else if (token == null && ! arrayPrefixParsed) return null; - else if (token != START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'"); - long start = 0, end = -1; - OperationType type = null; - DocumentId id = null; - OperationParameters parameters = protoParameters; - loop: while (true) { - switch (parser.nextToken()) { - case FIELD_NAME: - switch (parser.getText()) { - case "id": - case "put": type = PUT; id = readId(); break; - case "update": type = UPDATE; id = readId(); break; - case "remove": type = REMOVE; id = readId(); break; - case "condition": parameters = parameters.testAndSetCondition(readString()); break; - case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; - case "fields": { - expect(START_OBJECT); - start = parser.getTokenLocation().getByteOffset(); - int depth = 1; - while (depth > 0) switch (parser.nextToken()) { - case START_OBJECT: ++depth; break; - case END_OBJECT: --depth; break; - } - end = parser.getTokenLocation().getByteOffset() + 1; - break; - } - default: throw parseException("Unexpected field name '" + parser.getText() + "'"); - } - break; - - case END_OBJECT: - break loop; - - default: - throw parseException("Unexpected token '" + parser.currentToken() + "'"); - } - } - if (id == null) - throw parseException("No document id for document"); - if (type == REMOVE) { - if (end >= start) - throw parseException("Illegal 'fields' object for remove operation"); - else - start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head. - } - else if (end < start) - throw parseException("No 'fields' object for document"); - - String payload = getDocumentJson(start, end); - switch (type) { - case PUT: return client.put (id, payload, parameters); - case UPDATE: return client.update(id, payload, parameters); - case REMOVE: return client.remove(id, parameters); - default: throw new OperationParseException("Unexpected operation type '" + type + "'"); - } - } - - private void expect(JsonToken token) throws IOException { - if (parser.nextToken() != token) - throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - } - - private String readString() throws IOException { - String value = parser.nextTextValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - } - - private boolean readBoolean() throws IOException { - Boolean value = parser.nextBooleanValue(); - if (value == null) - throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() + - ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")"); - - return value; - - } - - private DocumentId readId() throws IOException { - return DocumentId.of(readString()); - } - - } - - public static class Builder { - - final FeedClient client; - OperationParameters parameters = OperationParameters.empty(); - - private Builder(FeedClient client) { - this.client = requireNonNull(client); - } - - public Builder withTimeout(Duration timeout) { - parameters = parameters.timeout(timeout); - return this; - } - - public Builder withRoute(String route) { - parameters = parameters.route(route); - return this; - } - - public Builder withTracelevel(int tracelevel) { - parameters = parameters.tracelevel(tracelevel); - return this; - } - - public JsonFeeder build() { - return new JsonFeeder(client, parameters); - } - - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java deleted file mode 100644 index 0ec40e114df..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.time.Duration; -import java.util.Objects; -import java.util.Optional; -import java.util.OptionalInt; - -/** - * Per-operation feed parameters - * - * @author bjorncs - * @author jonmv - */ -public class OperationParameters { - - static final OperationParameters empty = new OperationParameters(false, null, null, null, 0); - - private final boolean create; - private final String condition; - private final Duration timeout; - private final String route; - private final int tracelevel; - - private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) { - this.create = create; - this.condition = condition; - this.timeout = timeout; - this.route = route; - this.tracelevel = tracelevel; - } - - public static OperationParameters empty() { return empty; } - - public OperationParameters createIfNonExistent(boolean create) { - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters testAndSetCondition(String condition) { - if (condition.isEmpty()) - throw new IllegalArgumentException("TestAndSetCondition must be non-empty"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters timeout(Duration timeout) { - if (timeout.isNegative() || timeout.isZero()) - throw new IllegalArgumentException("Timeout must be positive, but was " + timeout); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters route(String route) { - if (route.isEmpty()) - throw new IllegalArgumentException("Route must be non-empty"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public OperationParameters tracelevel(int tracelevel) { - if (tracelevel < 1 || tracelevel > 9) - throw new IllegalArgumentException("Tracelevel must be in [1, 9]"); - - return new OperationParameters(create, condition, timeout, route, tracelevel); - } - - public boolean createIfNonExistent() { return create; } - public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); } - public Optional<Duration> timeout() { return Optional.ofNullable(timeout); } - public Optional<String> route() { return Optional.ofNullable(route); } - public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OperationParameters that = (OperationParameters) o; - return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route); - } - - @Override - public int hashCode() { - return Objects.hash(create, condition, timeout, route, tracelevel); - } - - @Override - public String toString() { - return "OperationParameters{" + - "create=" + create + - ", condition='" + condition + '\'' + - ", timeout=" + timeout + - ", route='" + route + '\'' + - ", tracelevel=" + tracelevel + - '}'; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java deleted file mode 100644 index f60368dd67f..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -/** - * Signals that supplied JSON for a document/operation is invalid - * - * @author bjorncs - */ -public class OperationParseException extends FeedException { - - public OperationParseException(String message) { super(message); } - - public OperationParseException(String message, Throwable cause) { super(message, cause); } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java deleted file mode 100644 index ab2faf245d8..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Statistics for feed operations over HTTP against a Vespa cluster. - * - * @author jonmv - */ -public class OperationStats { - - private final long requests; - private final Map<Integer, Long> responsesByCode; - private final long inflight; - private final long exceptions; - private final long averageLatencyMillis; - private final long minLatencyMillis; - private final long maxLatencyMillis; - private final long bytesSent; - private final long bytesReceived; - - public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, - long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, - long bytesSent, long bytesReceived) { - this.requests = requests; - this.responsesByCode = responsesByCode; - this.exceptions = exceptions; - this.inflight = inflight; - this.averageLatencyMillis = averageLatencyMillis; - this.minLatencyMillis = minLatencyMillis; - this.maxLatencyMillis = maxLatencyMillis; - this.bytesSent = bytesSent; - this.bytesReceived = bytesReceived; - } - - /** Returns the difference between this and the initial. Min and max latency are not modified. */ - public OperationStats since(OperationStats initial) { - return new OperationStats(requests - initial.requests, - responsesByCode.entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey(), - entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))), - exceptions - initial.exceptions, - inflight - initial.inflight, - responsesByCode.size() == initial.responsesByCode.size() ? 0 : - (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size()) - / (responsesByCode.size() - initial.responsesByCode.size()), - minLatencyMillis, - maxLatencyMillis, - bytesSent - initial.bytesSent, - bytesReceived - initial.bytesReceived); - } - - /** Number of HTTP requests attempted. */ - public long requests() { - return requests; - } - - /** Number of HTTP responses received. */ - public long responses() { - return requests - inflight - exceptions; - } - - /** Number of 200 OK HTTP responses received. */ - public long successes() { - return responsesByCode.getOrDefault(200, 0L); - } - - /** Number of HTTP responses by status code. */ - public Map<Integer, Long> responsesByCode() { - return responsesByCode; - } - - /** Number of exceptions (instead of responses). */ - public long exceptions() { - return exceptions; - } - - /** Number of attempted requests which haven't yielded a response or exception yet. */ - public long inflight() { - return inflight; - } - - /** Average request-response latency, or -1. */ - public long averageLatencyMillis() { - return averageLatencyMillis; - } - - /** Minimum request-response latency, or -1. */ - public long minLatencyMillis() { - return minLatencyMillis; - } - - /** Maximum request-response latency, or -1. */ - public long maxLatencyMillis() { - return maxLatencyMillis; - } - - /** Number of bytes sent, for HTTP requests with a response. */ - public long bytesSent() { - return bytesSent; - } - - /** Number of bytes received in HTTP responses. */ - public long bytesReceived() { - return bytesReceived; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - OperationStats that = (OperationStats) o; - return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode); - } - - @Override - public int hashCode() { - return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived); - } - - @Override - public String toString() { - return "Stats{" + - "requests=" + requests + - ", responsesByCode=" + responsesByCode + - ", exceptions=" + exceptions + - ", inflight=" + inflight + - ", averageLatencyMillis=" + averageLatencyMillis + - ", minLatencyMillis=" + minLatencyMillis + - ", maxLatencyMillis=" + maxLatencyMillis + - ", bytesSent=" + bytesSent + - ", bytesReceived=" + bytesReceived + - '}'; - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java deleted file mode 100644 index d9eaff40d74..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -import java.util.Optional; - -/** - * Signals that the document API in the feed container returned a failure result for a feed operation. - * - * @author jonmv - */ -public class ResultException extends FeedException { - - private final String trace; - - public ResultException(DocumentId documentId, String message, String trace) { - super(documentId, message); - this.trace = trace; - } - - /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */ - public Optional<String> getTrace() { - return Optional.ofNullable(trace); - } - -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java deleted file mode 100644 index 947ab9f0560..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - -/** - * Signals that the client was unable to obtain a proper response/result from container - * - * @author bjorncs - */ -public class ResultParseException extends FeedException { - - public ResultParseException(DocumentId documentId, String message) { super(documentId, message); } - - public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); } -} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 52d7af2fb31..6dc9ec4efb1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.HttpResponse; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.RequestConfig; @@ -18,7 +19,6 @@ import org.apache.hc.core5.util.Timeout; import javax.net.ssl.SSLContext; import java.io.IOException; import java.net.URI; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -43,7 +43,7 @@ class ApacheCluster implements Cluster { .setResponseTimeout(Timeout.ofMinutes(5)) .build(); - ApacheCluster(FeedClientBuilder builder) throws IOException { + ApacheCluster(FeedClientBuilderImpl builder) throws IOException { for (URI endpoint : builder.endpoints) for (int i = 0; i < builder.connectionsPerEndpoint; i++) endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); @@ -114,7 +114,7 @@ class ApacheCluster implements Cluster { } - private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { + private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilderImpl builder) throws IOException { SSLContext sslContext = builder.constructSslContext(); String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites())); if (allowedCiphers.length == 0) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java index 05ff6e99308..40049bad217 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.util.HashMap; import java.util.Map; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java index 57c028426fe..ee9188fdc2b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java @@ -1,8 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.io.Closeable; -import java.util.Collections; import java.util.concurrent.CompletableFuture; /** diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java index 282e4e14285..96cf7998681 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.nio.charset.StandardCharsets; import java.time.Duration; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index a379a8b066b..5969fe267c0 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -1,7 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; -import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -25,7 +26,7 @@ public class DynamicThrottler extends StaticThrottler { private long startNanos = System.nanoTime(); private long sent = 0; - public DynamicThrottler(FeedClientBuilder builder) { + public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); targetInflight = new AtomicLong(8 * minInflight); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 3b79d47b494..7dafeb0b541 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedClientBuilder; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; @@ -16,6 +19,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -26,11 +30,11 @@ import static java.util.Objects.requireNonNull; * @author bjorncs * @author jonmv */ -public class FeedClientBuilder { +public class FeedClientBuilderImpl implements FeedClientBuilder { static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; - final List<URI> endpoints; + List<URI> endpoints; final Map<String, Supplier<String>> requestHeaders = new HashMap<>(); SSLContext sslContext; HostnameVerifier hostnameVerifier; @@ -47,72 +51,65 @@ public class FeedClientBuilder { boolean benchmark = true; boolean dryrun = false; - /** Creates a builder for a single container endpoint **/ - public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } - /** Creates a builder for multiple container endpoints **/ - public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); } - private FeedClientBuilder(List<URI> endpoints) { + public FeedClientBuilderImpl() { + } + + FeedClientBuilderImpl(List<URI> endpoints) { + this(); + setEndpointUris(endpoints); + } + + @Override + public FeedClientBuilder setEndpointUris(List<URI> endpoints) { if (endpoints.isEmpty()) throw new IllegalArgumentException("At least one endpoint must be provided"); for (URI endpoint : endpoints) requireNonNull(endpoint.getHost()); - this.endpoints = new ArrayList<>(endpoints); + return this; } - /** - * Sets the number of connections this client will use per endpoint. - * - * A reasonable value here is a value that lets all feed clients (if more than one) - * collectively have a number of connections which is a small multiple of the numbers - * of containers in the cluster to feed, so load can be balanced across these containers. - * In general, this value should be kept as low as possible, but poor connectivity - * between feeder and cluster may also warrant a higher number of connections. - */ - public FeedClientBuilder setConnectionsPerEndpoint(int max) { + @Override + public FeedClientBuilderImpl setConnectionsPerEndpoint(int max) { if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max); this.connectionsPerEndpoint = max; return this; } - /** - * Sets the maximum number of streams per HTTP/2 connection for this client. - * - * This determines the maximum number of concurrent, inflight requests for this client, - * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over - * more connections, when possible. - * The feed client automatically throttles load to achieve the best throughput, and the - * actual number of streams per connection is usually lower than the maximum. - */ - public FeedClientBuilder setMaxStreamPerConnection(int max) { + @Override + public FeedClientBuilderImpl setMaxStreamPerConnection(int max) { if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max); this.maxStreamsPerConnection = max; return this; } /** Sets {@link SSLContext} instance. */ - public FeedClientBuilder setSslContext(SSLContext context) { + @Override + public FeedClientBuilderImpl setSslContext(SSLContext context) { this.sslContext = requireNonNull(context); return this; } /** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */ - public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) { + @Override + public FeedClientBuilderImpl setHostnameVerifier(HostnameVerifier verifier) { this.hostnameVerifier = requireNonNull(verifier); return this; } /** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */ - public FeedClientBuilder noBenchmarking() { + @Override + public FeedClientBuilderImpl noBenchmarking() { this.benchmark = false; return this; } /** Adds HTTP request header to all client requests. */ - public FeedClientBuilder addRequestHeader(String name, String value) { + @Override + public FeedClientBuilderImpl addRequestHeader(String name, String value) { return addRequestHeader(name, () -> requireNonNull(value)); } @@ -120,7 +117,8 @@ public class FeedClientBuilder { * Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request, * i.e. value can be dynamically updated during a feed. */ - public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) { + @Override + public FeedClientBuilderImpl addRequestHeader(String name, Supplier<String> valueSupplier) { this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier)); return this; } @@ -129,7 +127,8 @@ public class FeedClientBuilder { * Overrides default retry strategy. * @see FeedClient.RetryStrategy */ - public FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy) { + @Override + public FeedClientBuilderImpl setRetryStrategy(FeedClient.RetryStrategy strategy) { this.retryStrategy = requireNonNull(strategy); return this; } @@ -138,31 +137,36 @@ public class FeedClientBuilder { * Overrides default circuit breaker. * @see FeedClient.CircuitBreaker */ - public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) { + @Override + public FeedClientBuilderImpl setCircuitBreaker(FeedClient.CircuitBreaker breaker) { this.circuitBreaker = requireNonNull(breaker); return this; } /** Sets path to client SSL certificate/key PEM files */ - public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { + @Override + public FeedClientBuilderImpl setCertificate(Path certificatePemFile, Path privateKeyPemFile) { this.certificateFile = certificatePemFile; this.privateKeyFile = privateKeyPemFile; return this; } /** Sets client SSL certificates/key */ - public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { + @Override + public FeedClientBuilderImpl setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) { this.certificate = certificate; this.privateKey = privateKey; return this; } /** Sets client SSL certificate/key */ - public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) { + @Override + public FeedClientBuilderImpl setCertificate(X509Certificate certificate, PrivateKey privateKey) { return setCertificate(Collections.singletonList(certificate), privateKey); } - public FeedClientBuilder setDryrun(boolean enabled) { + @Override + public FeedClientBuilderImpl setDryrun(boolean enabled) { this.dryrun = enabled; return this; } @@ -171,18 +175,21 @@ public class FeedClientBuilder { * Overrides JVM default SSL truststore * @param caCertificatesFile Path to PEM encoded file containing trusted certificates */ - public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) { + @Override + public FeedClientBuilderImpl setCaCertificatesFile(Path caCertificatesFile) { this.caCertificatesFile = caCertificatesFile; return this; } /** Overrides JVM default SSL truststore */ - public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) { + @Override + public FeedClientBuilderImpl setCaCertificates(Collection<X509Certificate> caCertificates) { this.caCertificates = caCertificates; return this; } /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ + @Override public FeedClient build() { try { validateConfiguration(); @@ -209,6 +216,9 @@ public class FeedClientBuilder { } private void validateConfiguration() { + if (endpoints == null) { + throw new IllegalArgumentException("At least one endpoint must be provided"); + } if (sslContext != null && ( certificateFile != null || caCertificatesFile != null || privateKeyFile != null || certificate != null || caCertificates != null || privateKey != null)) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index cb5e35c79a5..b223fce7cab 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.HttpResponse; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index eb818ba1d48..3fd44596d63 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -1,6 +1,15 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; - +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationParameters; +import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.Result; +import ai.vespa.feed.client.ResultException; +import ai.vespa.feed.client.ResultParseException; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; @@ -33,11 +42,11 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); - HttpFeedClient(FeedClientBuilder builder) throws IOException { + HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { this(builder, new HttpRequestStrategy(builder)); } - HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, RequestStrategy requestStrategy) { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; } @@ -173,7 +182,7 @@ class HttpFeedClient implements FeedClient { if (outcome == Outcome.vespaFailure) throw new ResultException(documentId, message, trace); - return new Result(toResultType(outcome), documentId, message, trace); + return new ResultImpl(toResultType(outcome), documentId, message, trace); } static String getPath(DocumentId documentId) { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 48defd71ea8..08b8ca08c61 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import java.util.Map; import java.util.function.Supplier; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index cf65a874f3b..6fec0029bc3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -1,8 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.FeedClient; import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; +import ai.vespa.feed.client.FeedException; +import ai.vespa.feed.client.HttpResponse ; +import ai.vespa.feed.client.OperationStats; import java.io.IOException; import java.nio.channels.CancelledKeyException; @@ -62,11 +67,11 @@ class HttpRequestStrategy implements RequestStrategy { return thread; }); - HttpRequestStrategy(FeedClientBuilder builder) throws IOException { + HttpRequestStrategy(FeedClientBuilderImpl builder) throws IOException { this(builder, builder.dryrun ? new DryrunCluster() : new ApacheCluster(builder)); } - HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java index 9a97f7daa66..e3b6b594593 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java @@ -1,7 +1,10 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; +import ai.vespa.feed.client.DocumentId; import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; +import ai.vespa.feed.client.HttpResponse; +import ai.vespa.feed.client.OperationStats; import java.util.concurrent.CompletableFuture; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java index 5ff3fd0a219..dabf76cba34 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java @@ -1,5 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.DocumentId; +import ai.vespa.feed.client.Result; import java.util.Optional; @@ -9,29 +12,24 @@ import java.util.Optional; * @author bjorncs * @author jonmv */ -public class Result { +public class ResultImpl implements Result { private final Type type; private final DocumentId documentId; private final String resultMessage; private final String traceMessage; - Result(Type type, DocumentId documentId, String resultMessage, String traceMessage) { + ResultImpl(Type type, DocumentId documentId, String resultMessage, String traceMessage) { this.type = type; this.documentId = documentId; this.resultMessage = resultMessage; this.traceMessage = traceMessage; } - public enum Type { - success, - conditionNotMet - } - - public Type type() { return type; } - public DocumentId documentId() { return documentId; } - public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); } - public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); } + @Override public Type type() { return type; } + @Override public DocumentId documentId() { return documentId; } + @Override public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); } + @Override public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); } @Override public String toString() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java index f5e13eccd56..2ca4577abe6 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; import org.bouncycastle.asn1.ASN1ObjectIdentifier; import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 5137a18d923..1f9cf8e5155 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -18,7 +20,7 @@ public class StaticThrottler implements Throttler { protected final long minInflight; private final AtomicLong targetX10; - public StaticThrottler(FeedClientBuilder builder) { + public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 16L * builder.connectionsPerEndpoint * builder.endpoints.size(); maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java index f2453c27879..700a6f6f805 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java @@ -1,5 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; + +import ai.vespa.feed.client.HttpResponse; import java.util.concurrent.CompletableFuture; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java deleted file mode 100644 index daab16a9ff2..00000000000 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @author bjorncs - */ - -@PublicApi -package ai.vespa.feed.client; - -import com.yahoo.api.annotations.PublicApi;
\ No newline at end of file diff --git a/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder b/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder new file mode 100644 index 00000000000..b6e28b1806c --- /dev/null +++ b/vespa-feed-client/src/main/resources/META-INF.services/ai.vespa.feed.client.FeedClientBuilder @@ -0,0 +1,2 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +ai.vespa.feed.client.impl.FeedClientBuilderImpl
\ No newline at end of file diff --git a/vespa-feed-client/src/main/sh/vespa-version-generator.sh b/vespa-feed-client/src/main/sh/vespa-version-generator.sh index 5aafb3e2bf7..44fb7d167db 100755 --- a/vespa-feed-client/src/main/sh/vespa-version-generator.sh +++ b/vespa-feed-client/src/main/sh/vespa-version-generator.sh @@ -16,7 +16,7 @@ mkdir -p $destinationDir versionNumber=$(cat $source | grep V_TAG_COMPONENT | awk '{print $2}' ) cat > $destination <<- END -package ai.vespa.feed.client; +package ai.vespa.feed.client.impl; class Vespa { static final String VERSION = "$versionNumber"; |