aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src/main/java
diff options
context:
space:
mode:
authorMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
committerMorten Tokle <mortent@verizonmedia.com>2021-12-07 12:52:42 +0100
commit5e956429169d3a733114e5f76f051167f291c786 (patch)
treefa2b9cc664c8c639482397e9a4566149dac3ae29 /vespa-feed-client/src/main/java
parentae09069f544a086af4ae02a092ec66788a3cae9e (diff)
Extract vespa-feed-client-api module from vespa-feed-client
Diffstat (limited to 'vespa-feed-client/src/main/java')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java112
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java110
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java47
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java16
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java514
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java97
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java15
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java139
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java25
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java14
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java)8
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java)5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java)6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java)4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java)7
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java)94
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java)5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java)19
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java)2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java)11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java)5
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java)22
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java)2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java)6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java (renamed from vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java)4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java9
26 files changed, 120 insertions, 1178 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java
deleted file mode 100644
index 5474bcfda01..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DocumentId.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.util.Objects;
-import java.util.Optional;
-import java.util.OptionalLong;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Represents a Vespa document id
- *
- * @author jonmv
- */
-public class DocumentId {
-
- private final String documentType;
- private final String namespace;
- private final OptionalLong number;
- private final Optional<String> group;
- private final String userSpecific;
-
- private DocumentId(String documentType, String namespace, OptionalLong number, Optional<String> group, String userSpecific) {
- this.documentType = requireNonNull(documentType);
- this.namespace = requireNonNull(namespace);
- this.number = requireNonNull(number);
- this.group = requireNonNull(group);
- this.userSpecific = requireNonNull(userSpecific);
- }
-
- public static DocumentId of(String namespace, String documentType, String userSpecific) {
- return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.empty(), userSpecific);
- }
-
- public static DocumentId of(String namespace, String documentType, long number, String userSpecific) {
- return new DocumentId(documentType, namespace, OptionalLong.of(number), Optional.empty(), userSpecific);
- }
-
- public static DocumentId of(String namespace, String documentType, String group, String userSpecific) {
- return new DocumentId(documentType, namespace, OptionalLong.empty(), Optional.of(group), userSpecific);
- }
-
- public static DocumentId of(String serialized) {
- DocumentId parsed = parse(serialized);
- if (parsed != null) return parsed;
- throw new IllegalArgumentException("Document ID must be on the form " +
- "'id:<namespace>:<document-type>:[n=<number>|g=<group>]:<user-specific>', " +
- "but was '" + serialized + "'");
- }
-
- private static DocumentId parse(String serialized) {
- int i, j = -1;
- if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
- if ( ! "id".equals(serialized.substring(i, j))) return null;
- if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
- String namespace = serialized.substring(i, j);
- if ((j = serialized.indexOf(':', i = j + 1)) <= i) return null;
- String documentType = serialized.substring(i, j);
- if ((j = serialized.indexOf(':', i = j + 1)) < i) return null;
- String group = serialized.substring(i, j);
- if (serialized.length() <= (i = j + 1)) return null;
- String userSpecific = serialized.substring(i);
- if (group.startsWith("n=") && group.length() > 2)
- return DocumentId.of(namespace, documentType, Long.parseLong(group.substring(2)), userSpecific);
- if (group.startsWith("g=") && group.length() > 2)
- return DocumentId.of(namespace, documentType, group.substring(2), userSpecific);
- if (group.isEmpty())
- return DocumentId.of(namespace, documentType, userSpecific);
- return null;
- }
-
- public String documentType() {
- return documentType;
- }
-
- public String namespace() {
- return namespace;
- }
-
- public OptionalLong number() {
- return number;
- }
-
- public Optional<String> group() {
- return group;
- }
-
- public String userSpecific() {
- return userSpecific;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- DocumentId that = (DocumentId) o;
- return documentType.equals(that.documentType) && namespace.equals(that.namespace) && number.equals(that.number) && group.equals(that.group) && userSpecific.equals(that.userSpecific);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(documentType, namespace, number, group, userSpecific);
- }
-
- @Override
- public String toString() {
- return "id:" + namespace + ":" + documentType + ":" +
- (number.isPresent() ? "n=" + number.getAsLong() : group.map("g="::concat).orElse("")) +
- ":" + userSpecific;
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
deleted file mode 100644
index d463c611d6a..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Asynchronous feed client accepting document operations as JSON. The payload should be
- * the same as the HTTP payload required by the /document/v1 HTTP API, i.e., <pre>
- * {
- * "fields": {
- * ...
- * }
- * }
- * </pre>
- *
- * @author bjorncs
- * @author jonmv
- */
-public interface FeedClient extends Closeable {
-
- /**
- * Send a document put with the given parameters, returning a future with the result of the operation.
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
-
- /**
- * Send a document update with the given parameters, returning a future with the result of the operation.
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
-
- /**
- * Send a document remove with the given parameters, returning a future with the result of the operation.
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
-
- /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */
- OperationStats stats();
-
- /** Current state of the circuit breaker. */
- CircuitBreaker.State circuitBreakerState();
-
- /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
- void close(boolean graceful);
-
- /** Initiates graceful shutdown. See {@link #close(boolean)}. */
- default void close() { close(true); }
-
- /** Controls what to retry, and how many times. */
- interface RetryStrategy {
-
- /** Whether to retry operations of the given type. */
- default boolean retry(OperationType type) { return true; }
-
- /** Number of retries per operation for assumed transient, non-backpressure problems. */
- default int retries() { return 10; }
-
- }
-
- /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
- interface CircuitBreaker {
-
- /** A circuit breaker which is always closed. */
- CircuitBreaker FUSED = () -> State.CLOSED;
-
- /** Called by the client whenever a successful response is obtained. */
- default void success() { }
-
- /** Called by the client whenever an error HTTP response is received. */
- default void failure(HttpResponse response) { }
-
- /** Called by the client whenever an exception occurs trying to obtain a HTTP response. */
- default void failure(Throwable cause) { }
-
- /** The current state of the circuit breaker. */
- State state();
-
- enum State {
-
- /** Circuit is closed: business as usual. */
- CLOSED,
-
- /** Circuit is half-open: something is wrong, perhaps it recovers? */
- HALF_OPEN,
-
- /** Circuit is open: we have given up. */
- OPEN;
-
- }
-
- }
-
- enum OperationType {
-
- /** A document put operation. This is idempotent. */
- PUT,
-
- /** A document update operation. This is idempotent if all its contained updates are. */
- UPDATE,
-
- /** A document remove operation. This is idempotent. */
- REMOVE;
-
- }
-
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
deleted file mode 100644
index 1936eb09418..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.util.Optional;
-
-/**
- * Signals that an error occurred during feeding
- *
- * @author bjorncs
- */
-public class FeedException extends RuntimeException {
-
- private final DocumentId documentId;
-
- public FeedException(String message) {
- super(message);
- this.documentId = null;
- }
-
- public FeedException(DocumentId documentId, String message) {
- super(message);
- this.documentId = documentId;
- }
-
- public FeedException(String message, Throwable cause) {
- super(message, cause);
- this.documentId = null;
- }
-
- public FeedException(Throwable cause) {
- super(cause);
- this.documentId = null;
- }
-
- public FeedException(DocumentId documentId, Throwable cause) {
- super(cause);
- this.documentId = documentId;
- }
-
- public FeedException(DocumentId documentId, String message, Throwable cause) {
- super(message, cause);
- this.documentId = documentId;
- }
-
- public Optional<DocumentId> documentId() { return Optional.ofNullable(documentId); }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
deleted file mode 100644
index 07fdb2d7257..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpResponse.java
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-interface HttpResponse {
-
- int code();
- byte[] body();
-
- static HttpResponse of(int code, byte[] body) {
- return new HttpResponse() {
- @Override public int code() { return code; }
- @Override public byte[] body() { return body; }
- };
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
deleted file mode 100644
index 2d7caea9f26..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ /dev/null
@@ -1,514 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import ai.vespa.feed.client.FeedClient.OperationType;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonLocation;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
-import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
-import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
-import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;
-import static java.lang.Math.min;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-
-/**
- * @author jonmv
- * @author bjorncs
- */
-public class JsonFeeder implements Closeable {
-
- private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(r -> {
- Thread t = new Thread(r, "json-feeder-result-executor");
- t.setDaemon(true);
- return t;
- });
- private final FeedClient client;
- private final OperationParameters protoParameters;
-
- private JsonFeeder(FeedClient client, OperationParameters protoParameters) {
- this.client = client;
- this.protoParameters = protoParameters;
- }
-
- public interface ResultCallback {
- /**
- * Invoked after each operation has either completed successfully or failed
- *
- * @param result Non-null if operation completed successfully
- * @param error Non-null if operation failed
- */
- default void onNextResult(Result result, FeedException error) { }
-
- /**
- * Invoked if an unrecoverable error occurred during feed processing,
- * after which no other {@link ResultCallback} methods are invoked.
- */
- default void onError(FeedException error) { }
-
- /**
- * Invoked when all feed operations are either completed successfully or failed.
- */
- default void onComplete() { }
- }
-
- public static Builder builder(FeedClient client) { return new Builder(client); }
-
- /** Feeds single JSON feed operations on the form
- * <pre>
- * {
- * "id": "id:ns:type::boo",
- * "fields": { ... document fields ... }
- * }
- * </pre>
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- public CompletableFuture<Result> feedSingle(String json) {
- CompletableFuture<Result> result = new CompletableFuture<>();
- try {
- SingleOperationParserAndExecutor parser = new SingleOperationParserAndExecutor(json.getBytes(UTF_8));
- parser.next().whenCompleteAsync((operationResult, error) -> {
- if (error != null) {
- result.completeExceptionally(error);
- } else {
- result.complete(operationResult);
- }
- }, resultExecutor);
- } catch (Exception e) {
- resultExecutor.execute(() -> result.completeExceptionally(wrapException(e)));
- }
- return result;
- }
-
- /** Feeds a stream containing a JSON array of feed operations on the form
- * <pre>
- * [
- * {
- * "id": "id:ns:type::boo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "put": "id:ns:type::foo",
- * "fields": { ... document fields ... }
- * },
- * {
- * "update": "id:ns:type:n=4:bar",
- * "create": true,
- * "fields": { ... partial update fields ... }
- * },
- * {
- * "remove": "id:ns:type:g=foo:bar",
- * "condition": "type.baz = \"bax\""
- * },
- * ...
- * ]
- * </pre>
- * Note that {@code "id"} is an alias for the document put operation.
- * Exceptional completion will use be an instance of {@link FeedException} or one of its sub-classes.
- */
- public CompletableFuture<Void> feedMany(InputStream jsonStream, ResultCallback resultCallback) {
- return feedMany(jsonStream, 1 << 26, resultCallback);
- }
-
- /**
- * Same as {@link #feedMany(InputStream, ResultCallback)}, but without a provided {@link ResultCallback} instance.
- * @see JsonFeeder#feedMany(InputStream, ResultCallback) for details.
- */
- public CompletableFuture<Void> feedMany(InputStream jsonStream) {
- return feedMany(jsonStream, new ResultCallback() { });
- }
-
- CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
- CompletableFuture<Void> overallResult = new CompletableFuture<>();
- CompletableFuture<Result> result;
- AtomicInteger pending = new AtomicInteger(1); // The below dispatch loop itself is counted as a single pending operation
- AtomicBoolean finalCallbackInvoked = new AtomicBoolean();
- try {
- RingBufferStream buffer = new RingBufferStream(jsonStream, size);
- while ((result = buffer.next()) != null) {
- pending.incrementAndGet();
- result.whenCompleteAsync((r, t) -> {
- if (!finalCallbackInvoked.get()) {
- resultCallback.onNextResult(r, (FeedException) t);
- }
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
- resultCallback.onComplete();
- overallResult.complete(null);
- }
- }, resultExecutor);
- }
- if (pending.decrementAndGet() == 0 && finalCallbackInvoked.compareAndSet(false, true)) {
- resultExecutor.execute(() -> {
- resultCallback.onComplete();
- overallResult.complete(null);
- });
- }
- } catch (Exception e) {
- if (finalCallbackInvoked.compareAndSet(false, true)) {
- resultExecutor.execute(() -> {
- FeedException wrapped = wrapException(e);
- resultCallback.onError(wrapped);
- overallResult.completeExceptionally(wrapped);
- });
- }
- }
- return overallResult;
- }
-
- private static final JsonFactory factory = new JsonFactory();
-
- @Override public void close() throws IOException {
- client.close();
- resultExecutor.shutdown();
- try {
- if (!resultExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
- throw new IOException("Failed to close client in time");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- private FeedException wrapException(Exception e) {
- if (e instanceof FeedException) return (FeedException) e;
- if (e instanceof IOException) {
- return new OperationParseException("Failed to parse document JSON: " + e.getMessage(), e);
- }
- return new FeedException(e);
- }
-
- private class RingBufferStream extends InputStream {
-
- private final byte[] b = new byte[1];
- private final InputStream in;
- private final Object lock = new Object();
- private byte[] data;
- private int size;
- private IOException thrown = null;
- private long tail = 0;
- private long pos = 0;
- private long head = 0;
- private boolean done = false;
- private final OperationParserAndExecutor parserAndExecutor;
-
- RingBufferStream(InputStream in, int size) throws IOException {
- this.in = in;
- this.data = new byte[size];
- this.size = size;
-
- new Thread(this::fill, "feed-reader").start();
-
- this.parserAndExecutor = new RingBufferBackedOperationParserAndExecutor(factory.createParser(this));
- }
-
- @Override
- public int read() throws IOException {
- return read(b, 0, 1) == -1 ? -1 : b[0];
- }
-
- @Override
- public int read(byte[] buffer, int off, int len) throws IOException {
- try {
- int ready;
- synchronized (lock) {
- if (pos - tail == size) // Buffer exhausted, nothing left to read, nowhere left to write.
- expand();
-
- while ((ready = (int) (head - pos)) == 0 && ! done)
- lock.wait();
- }
- if (thrown != null) throw thrown;
- if (ready == 0) return -1;
-
- ready = min(ready, len);
- int offset = (int) (pos % size);
- int length = min(ready, size - offset);
- System.arraycopy(data, offset, buffer, off, length);
- if (length < ready)
- System.arraycopy(data, 0, buffer, off + length, ready - length);
-
- pos += ready;
- return ready;
- }
- catch (InterruptedException e) {
- throw new InterruptedIOException("Interrupted waiting for data: " + e.getMessage());
- }
- }
-
- public CompletableFuture<Result> next() throws IOException {
- return parserAndExecutor.next();
- }
-
- private void expand() {
- int newSize = size * 2;
- if (newSize <= size)
- throw new IllegalStateException("Maximum buffer size exceeded; want to double " + size + ", but that's too much");
-
- byte[] newData = new byte[newSize];
- int offset = (int) (tail % size);
- int newOffset = (int) (tail % newSize);
- int toWrite = size - offset;
- System.arraycopy(data, offset, newData, newOffset, toWrite);
- if (toWrite < size)
- System.arraycopy(data, 0, newData, newOffset + toWrite, size - toWrite);
- size = newSize;
- data = newData;
- lock.notify();
- }
-
- private final byte[] prefix = "{\"fields\":".getBytes(UTF_8);
- private byte[] copy(long start, long end) {
- int length = (int) (end - start);
- byte[] buffer = new byte[prefix.length + length + 1];
- System.arraycopy(prefix, 0, buffer, 0, prefix.length);
-
- int offset = (int) (start % size);
- int toWrite = min(length, size - offset);
- System.arraycopy(data, offset, buffer, prefix.length, toWrite);
- if (toWrite < length)
- System.arraycopy(data, 0, buffer, prefix.length + toWrite, length - toWrite);
-
- buffer[buffer.length - 1] = '}';
- return buffer;
- }
-
-
- @Override
- public void close() throws IOException {
- synchronized (lock) {
- done = true;
- lock.notifyAll();
- }
- in.close();
- }
-
- private void fill() {
- try {
- while (true) {
- int free;
- synchronized (lock) {
- while ((free = (int) (tail + size - head)) <= 0 && ! done)
- lock.wait();
- }
- if (done) break;
-
- int off = (int) (head % size);
- int len = min(min(free, size - off), 1 << 13);
- int read = in.read(data, off, len);
-
- synchronized (lock) {
- if (read < 0) done = true;
- else head += read;
- lock.notify();
- }
- }
- } catch (InterruptedException e) {
- synchronized (lock) {
- done = true;
- thrown = new InterruptedIOException("Interrupted reading data: " + e.getMessage());
- }
- } catch (IOException e) {
- synchronized (lock) {
- done = true;
- thrown = e;
- }
- }
- }
-
- private class RingBufferBackedOperationParserAndExecutor extends OperationParserAndExecutor {
-
- RingBufferBackedOperationParserAndExecutor(JsonParser parser) { super(parser, true); }
-
- @Override
- String getDocumentJson(long start, long end) {
- String payload = new String(copy(start, end), UTF_8);
- synchronized (lock) {
- tail = end;
- lock.notify();
- }
- return payload;
- }
- }
- }
-
- private class SingleOperationParserAndExecutor extends OperationParserAndExecutor {
-
- private final byte[] json;
-
- SingleOperationParserAndExecutor(byte[] json) throws IOException {
- super(factory.createParser(json), false);
- this.json = json;
- }
-
- @Override
- String getDocumentJson(long start, long end) {
- return "{\"fields\":" + new String(json, (int) start, (int) (end - start), UTF_8) + "}";
- }
- }
-
- private abstract class OperationParserAndExecutor {
-
- private final JsonParser parser;
- private final boolean multipleOperations;
- private boolean arrayPrefixParsed;
-
- protected OperationParserAndExecutor(JsonParser parser, boolean multipleOperations) {
- this.parser = parser;
- this.multipleOperations = multipleOperations;
- }
-
- abstract String getDocumentJson(long start, long end);
-
- OperationParseException parseException(String error) {
- JsonLocation location = parser.getTokenLocation();
- return new OperationParseException(error + " at offset " + location.getByteOffset() +
- " (line " + location.getLineNr() + ", column " + location.getColumnNr() + ")");
- }
-
- CompletableFuture<Result> next() throws IOException {
- JsonToken token = parser.nextToken();
- if (multipleOperations && ! arrayPrefixParsed && token == START_ARRAY) {
- arrayPrefixParsed = true;
- token = parser.nextToken();
- }
- if (token == END_ARRAY && multipleOperations) return null;
- else if (token == null && ! arrayPrefixParsed) return null;
- else if (token != START_OBJECT) throw parseException("Unexpected token '" + parser.currentToken() + "'");
- long start = 0, end = -1;
- OperationType type = null;
- DocumentId id = null;
- OperationParameters parameters = protoParameters;
- loop: while (true) {
- switch (parser.nextToken()) {
- case FIELD_NAME:
- switch (parser.getText()) {
- case "id":
- case "put": type = PUT; id = readId(); break;
- case "update": type = UPDATE; id = readId(); break;
- case "remove": type = REMOVE; id = readId(); break;
- case "condition": parameters = parameters.testAndSetCondition(readString()); break;
- case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
- case "fields": {
- expect(START_OBJECT);
- start = parser.getTokenLocation().getByteOffset();
- int depth = 1;
- while (depth > 0) switch (parser.nextToken()) {
- case START_OBJECT: ++depth; break;
- case END_OBJECT: --depth; break;
- }
- end = parser.getTokenLocation().getByteOffset() + 1;
- break;
- }
- default: throw parseException("Unexpected field name '" + parser.getText() + "'");
- }
- break;
-
- case END_OBJECT:
- break loop;
-
- default:
- throw parseException("Unexpected token '" + parser.currentToken() + "'");
- }
- }
- if (id == null)
- throw parseException("No document id for document");
- if (type == REMOVE) {
- if (end >= start)
- throw parseException("Illegal 'fields' object for remove operation");
- else
- start = end = parser.getTokenLocation().getByteOffset(); // getDocumentJson advances buffer overwrite head.
- }
- else if (end < start)
- throw parseException("No 'fields' object for document");
-
- String payload = getDocumentJson(start, end);
- switch (type) {
- case PUT: return client.put (id, payload, parameters);
- case UPDATE: return client.update(id, payload, parameters);
- case REMOVE: return client.remove(id, parameters);
- default: throw new OperationParseException("Unexpected operation type '" + type + "'");
- }
- }
-
- private void expect(JsonToken token) throws IOException {
- if (parser.nextToken() != token)
- throw new OperationParseException("Expected '" + token + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
- }
-
- private String readString() throws IOException {
- String value = parser.nextTextValue();
- if (value == null)
- throw new OperationParseException("Expected '" + VALUE_STRING + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
- }
-
- private boolean readBoolean() throws IOException {
- Boolean value = parser.nextBooleanValue();
- if (value == null)
- throw new OperationParseException("Expected '" + VALUE_FALSE + "' or '" + VALUE_TRUE + "' at offset " + parser.getTokenLocation().getByteOffset() +
- ", but found '" + parser.currentToken() + "' (" + parser.getText() + ")");
-
- return value;
-
- }
-
- private DocumentId readId() throws IOException {
- return DocumentId.of(readString());
- }
-
- }
-
- public static class Builder {
-
- final FeedClient client;
- OperationParameters parameters = OperationParameters.empty();
-
- private Builder(FeedClient client) {
- this.client = requireNonNull(client);
- }
-
- public Builder withTimeout(Duration timeout) {
- parameters = parameters.timeout(timeout);
- return this;
- }
-
- public Builder withRoute(String route) {
- parameters = parameters.route(route);
- return this;
- }
-
- public Builder withTracelevel(int tracelevel) {
- parameters = parameters.tracelevel(tracelevel);
- return this;
- }
-
- public JsonFeeder build() {
- return new JsonFeeder(client, parameters);
- }
-
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
deleted file mode 100644
index 0ec40e114df..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParameters.java
+++ /dev/null
@@ -1,97 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.time.Duration;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.OptionalInt;
-
-/**
- * Per-operation feed parameters
- *
- * @author bjorncs
- * @author jonmv
- */
-public class OperationParameters {
-
- static final OperationParameters empty = new OperationParameters(false, null, null, null, 0);
-
- private final boolean create;
- private final String condition;
- private final Duration timeout;
- private final String route;
- private final int tracelevel;
-
- private OperationParameters(boolean create, String condition, Duration timeout, String route, int tracelevel) {
- this.create = create;
- this.condition = condition;
- this.timeout = timeout;
- this.route = route;
- this.tracelevel = tracelevel;
- }
-
- public static OperationParameters empty() { return empty; }
-
- public OperationParameters createIfNonExistent(boolean create) {
- return new OperationParameters(create, condition, timeout, route, tracelevel);
- }
-
- public OperationParameters testAndSetCondition(String condition) {
- if (condition.isEmpty())
- throw new IllegalArgumentException("TestAndSetCondition must be non-empty");
-
- return new OperationParameters(create, condition, timeout, route, tracelevel);
- }
-
- public OperationParameters timeout(Duration timeout) {
- if (timeout.isNegative() || timeout.isZero())
- throw new IllegalArgumentException("Timeout must be positive, but was " + timeout);
-
- return new OperationParameters(create, condition, timeout, route, tracelevel);
- }
-
- public OperationParameters route(String route) {
- if (route.isEmpty())
- throw new IllegalArgumentException("Route must be non-empty");
-
- return new OperationParameters(create, condition, timeout, route, tracelevel);
- }
-
- public OperationParameters tracelevel(int tracelevel) {
- if (tracelevel < 1 || tracelevel > 9)
- throw new IllegalArgumentException("Tracelevel must be in [1, 9]");
-
- return new OperationParameters(create, condition, timeout, route, tracelevel);
- }
-
- public boolean createIfNonExistent() { return create; }
- public Optional<String> testAndSetCondition() { return Optional.ofNullable(condition); }
- public Optional<Duration> timeout() { return Optional.ofNullable(timeout); }
- public Optional<String> route() { return Optional.ofNullable(route); }
- public OptionalInt tracelevel() { return tracelevel == 0 ? OptionalInt.empty() : OptionalInt.of(tracelevel); }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- OperationParameters that = (OperationParameters) o;
- return create == that.create && tracelevel == that.tracelevel && Objects.equals(condition, that.condition) && Objects.equals(timeout, that.timeout) && Objects.equals(route, that.route);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(create, condition, timeout, route, tracelevel);
- }
-
- @Override
- public String toString() {
- return "OperationParameters{" +
- "create=" + create +
- ", condition='" + condition + '\'' +
- ", timeout=" + timeout +
- ", route='" + route + '\'' +
- ", tracelevel=" + tracelevel +
- '}';
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java
deleted file mode 100644
index f60368dd67f..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationParseException.java
+++ /dev/null
@@ -1,15 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-/**
- * Signals that supplied JSON for a document/operation is invalid
- *
- * @author bjorncs
- */
-public class OperationParseException extends FeedException {
-
- public OperationParseException(String message) { super(message); }
-
- public OperationParseException(String message, Throwable cause) { super(message, cause); }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
deleted file mode 100644
index ab2faf245d8..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Statistics for feed operations over HTTP against a Vespa cluster.
- *
- * @author jonmv
- */
-public class OperationStats {
-
- private final long requests;
- private final Map<Integer, Long> responsesByCode;
- private final long inflight;
- private final long exceptions;
- private final long averageLatencyMillis;
- private final long minLatencyMillis;
- private final long maxLatencyMillis;
- private final long bytesSent;
- private final long bytesReceived;
-
- public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight,
- long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis,
- long bytesSent, long bytesReceived) {
- this.requests = requests;
- this.responsesByCode = responsesByCode;
- this.exceptions = exceptions;
- this.inflight = inflight;
- this.averageLatencyMillis = averageLatencyMillis;
- this.minLatencyMillis = minLatencyMillis;
- this.maxLatencyMillis = maxLatencyMillis;
- this.bytesSent = bytesSent;
- this.bytesReceived = bytesReceived;
- }
-
- /** Returns the difference between this and the initial. Min and max latency are not modified. */
- public OperationStats since(OperationStats initial) {
- return new OperationStats(requests - initial.requests,
- responsesByCode.entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(),
- entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))),
- exceptions - initial.exceptions,
- inflight - initial.inflight,
- responsesByCode.size() == initial.responsesByCode.size() ? 0 :
- (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size())
- / (responsesByCode.size() - initial.responsesByCode.size()),
- minLatencyMillis,
- maxLatencyMillis,
- bytesSent - initial.bytesSent,
- bytesReceived - initial.bytesReceived);
- }
-
- /** Number of HTTP requests attempted. */
- public long requests() {
- return requests;
- }
-
- /** Number of HTTP responses received. */
- public long responses() {
- return requests - inflight - exceptions;
- }
-
- /** Number of 200 OK HTTP responses received. */
- public long successes() {
- return responsesByCode.getOrDefault(200, 0L);
- }
-
- /** Number of HTTP responses by status code. */
- public Map<Integer, Long> responsesByCode() {
- return responsesByCode;
- }
-
- /** Number of exceptions (instead of responses). */
- public long exceptions() {
- return exceptions;
- }
-
- /** Number of attempted requests which haven't yielded a response or exception yet. */
- public long inflight() {
- return inflight;
- }
-
- /** Average request-response latency, or -1. */
- public long averageLatencyMillis() {
- return averageLatencyMillis;
- }
-
- /** Minimum request-response latency, or -1. */
- public long minLatencyMillis() {
- return minLatencyMillis;
- }
-
- /** Maximum request-response latency, or -1. */
- public long maxLatencyMillis() {
- return maxLatencyMillis;
- }
-
- /** Number of bytes sent, for HTTP requests with a response. */
- public long bytesSent() {
- return bytesSent;
- }
-
- /** Number of bytes received in HTTP responses. */
- public long bytesReceived() {
- return bytesReceived;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- OperationStats that = (OperationStats) o;
- return requests == that.requests && inflight == that.inflight && exceptions == that.exceptions && averageLatencyMillis == that.averageLatencyMillis && minLatencyMillis == that.minLatencyMillis && maxLatencyMillis == that.maxLatencyMillis && bytesSent == that.bytesSent && bytesReceived == that.bytesReceived && responsesByCode.equals(that.responsesByCode);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(requests, responsesByCode, inflight, exceptions, averageLatencyMillis, minLatencyMillis, maxLatencyMillis, bytesSent, bytesReceived);
- }
-
- @Override
- public String toString() {
- return "Stats{" +
- "requests=" + requests +
- ", responsesByCode=" + responsesByCode +
- ", exceptions=" + exceptions +
- ", inflight=" + inflight +
- ", averageLatencyMillis=" + averageLatencyMillis +
- ", minLatencyMillis=" + minLatencyMillis +
- ", maxLatencyMillis=" + maxLatencyMillis +
- ", bytesSent=" + bytesSent +
- ", bytesReceived=" + bytesReceived +
- '}';
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java
deleted file mode 100644
index d9eaff40d74..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultException.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-import java.util.Optional;
-
-/**
- * Signals that the document API in the feed container returned a failure result for a feed operation.
- *
- * @author jonmv
- */
-public class ResultException extends FeedException {
-
- private final String trace;
-
- public ResultException(DocumentId documentId, String message, String trace) {
- super(documentId, message);
- this.trace = trace;
- }
-
- /** Holds the trace, if the failed operation had a {@link OperationParameters#tracelevel(int)} higher than 0. */
- public Optional<String> getTrace() {
- return Optional.ofNullable(trace);
- }
-
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java
deleted file mode 100644
index 947ab9f0560..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ResultParseException.java
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
-/**
- * Signals that the client was unable to obtain a proper response/result from container
- *
- * @author bjorncs
- */
-public class ResultParseException extends FeedException {
-
- public ResultParseException(DocumentId documentId, String message) { super(documentId, message); }
-
- public ResultParseException(DocumentId documentId, Throwable cause) { super(documentId, cause); }
-}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
index 52d7af2fb31..6dc9ec4efb1 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+import ai.vespa.feed.client.HttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
@@ -18,7 +19,6 @@ import org.apache.hc.core5.util.Timeout;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URI;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -43,7 +43,7 @@ class ApacheCluster implements Cluster {
.setResponseTimeout(Timeout.ofMinutes(5))
.build();
- ApacheCluster(FeedClientBuilder builder) throws IOException {
+ ApacheCluster(FeedClientBuilderImpl builder) throws IOException {
for (URI endpoint : builder.endpoints)
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
@@ -114,7 +114,7 @@ class ApacheCluster implements Cluster {
}
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
+ private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilderImpl builder) throws IOException {
SSLContext sslContext = builder.constructSslContext();
String[] allowedCiphers = excludeH2Blacklisted(excludeWeak(sslContext.getSupportedSSLParameters().getCipherSuites()));
if (allowedCiphers.length == 0)
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java
index 05ff6e99308..40049bad217 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java
@@ -1,5 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
+import ai.vespa.feed.client.OperationStats;
import java.util.HashMap;
import java.util.Map;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java
index 57c028426fe..ee9188fdc2b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Cluster.java
@@ -1,8 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
+import ai.vespa.feed.client.OperationStats;
import java.io.Closeable;
-import java.util.Collections;
import java.util.concurrent.CompletableFuture;
/**
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java
index 282e4e14285..96cf7998681 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DryrunCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DryrunCluster.java
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index a379a8b066b..5969fe267c0 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -1,7 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
-import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -25,7 +26,7 @@ public class DynamicThrottler extends StaticThrottler {
private long startNanos = System.nanoTime();
private long sent = 0;
- public DynamicThrottler(FeedClientBuilder builder) {
+ public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
targetInflight = new AtomicLong(8 * minInflight);
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index 3b79d47b494..7dafeb0b541 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -1,5 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedClientBuilder;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
@@ -16,6 +19,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
@@ -26,11 +30,11 @@ import static java.util.Objects.requireNonNull;
* @author bjorncs
* @author jonmv
*/
-public class FeedClientBuilder {
+public class FeedClientBuilderImpl implements FeedClientBuilder {
static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { };
- final List<URI> endpoints;
+ List<URI> endpoints;
final Map<String, Supplier<String>> requestHeaders = new HashMap<>();
SSLContext sslContext;
HostnameVerifier hostnameVerifier;
@@ -47,72 +51,65 @@ public class FeedClientBuilder {
boolean benchmark = true;
boolean dryrun = false;
- /** Creates a builder for a single container endpoint **/
- public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
- /** Creates a builder for multiple container endpoints **/
- public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); }
- private FeedClientBuilder(List<URI> endpoints) {
+ public FeedClientBuilderImpl() {
+ }
+
+ FeedClientBuilderImpl(List<URI> endpoints) {
+ this();
+ setEndpointUris(endpoints);
+ }
+
+ @Override
+ public FeedClientBuilder setEndpointUris(List<URI> endpoints) {
if (endpoints.isEmpty())
throw new IllegalArgumentException("At least one endpoint must be provided");
for (URI endpoint : endpoints)
requireNonNull(endpoint.getHost());
-
this.endpoints = new ArrayList<>(endpoints);
+ return this;
}
- /**
- * Sets the number of connections this client will use per endpoint.
- *
- * A reasonable value here is a value that lets all feed clients (if more than one)
- * collectively have a number of connections which is a small multiple of the numbers
- * of containers in the cluster to feed, so load can be balanced across these containers.
- * In general, this value should be kept as low as possible, but poor connectivity
- * between feeder and cluster may also warrant a higher number of connections.
- */
- public FeedClientBuilder setConnectionsPerEndpoint(int max) {
+ @Override
+ public FeedClientBuilderImpl setConnectionsPerEndpoint(int max) {
if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max);
this.connectionsPerEndpoint = max;
return this;
}
- /**
- * Sets the maximum number of streams per HTTP/2 connection for this client.
- *
- * This determines the maximum number of concurrent, inflight requests for this client,
- * which is {@code maxConnections * maxStreamsPerConnection}. Prefer more streams over
- * more connections, when possible.
- * The feed client automatically throttles load to achieve the best throughput, and the
- * actual number of streams per connection is usually lower than the maximum.
- */
- public FeedClientBuilder setMaxStreamPerConnection(int max) {
+ @Override
+ public FeedClientBuilderImpl setMaxStreamPerConnection(int max) {
if (max < 1) throw new IllegalArgumentException("Max streams per connection must be at least 1, but was " + max);
this.maxStreamsPerConnection = max;
return this;
}
/** Sets {@link SSLContext} instance. */
- public FeedClientBuilder setSslContext(SSLContext context) {
+ @Override
+ public FeedClientBuilderImpl setSslContext(SSLContext context) {
this.sslContext = requireNonNull(context);
return this;
}
/** Sets {@link HostnameVerifier} instance (e.g for disabling default SSL hostname verification). */
- public FeedClientBuilder setHostnameVerifier(HostnameVerifier verifier) {
+ @Override
+ public FeedClientBuilderImpl setHostnameVerifier(HostnameVerifier verifier) {
this.hostnameVerifier = requireNonNull(verifier);
return this;
}
/** Turns off benchmarking. Attempting to get {@link FeedClient#stats()} will result in an exception. */
- public FeedClientBuilder noBenchmarking() {
+ @Override
+ public FeedClientBuilderImpl noBenchmarking() {
this.benchmark = false;
return this;
}
/** Adds HTTP request header to all client requests. */
- public FeedClientBuilder addRequestHeader(String name, String value) {
+ @Override
+ public FeedClientBuilderImpl addRequestHeader(String name, String value) {
return addRequestHeader(name, () -> requireNonNull(value));
}
@@ -120,7 +117,8 @@ public class FeedClientBuilder {
* Adds HTTP request header to all client requests. Value {@link Supplier} is invoked for each HTTP request,
* i.e. value can be dynamically updated during a feed.
*/
- public FeedClientBuilder addRequestHeader(String name, Supplier<String> valueSupplier) {
+ @Override
+ public FeedClientBuilderImpl addRequestHeader(String name, Supplier<String> valueSupplier) {
this.requestHeaders.put(requireNonNull(name), requireNonNull(valueSupplier));
return this;
}
@@ -129,7 +127,8 @@ public class FeedClientBuilder {
* Overrides default retry strategy.
* @see FeedClient.RetryStrategy
*/
- public FeedClientBuilder setRetryStrategy(FeedClient.RetryStrategy strategy) {
+ @Override
+ public FeedClientBuilderImpl setRetryStrategy(FeedClient.RetryStrategy strategy) {
this.retryStrategy = requireNonNull(strategy);
return this;
}
@@ -138,31 +137,36 @@ public class FeedClientBuilder {
* Overrides default circuit breaker.
* @see FeedClient.CircuitBreaker
*/
- public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) {
+ @Override
+ public FeedClientBuilderImpl setCircuitBreaker(FeedClient.CircuitBreaker breaker) {
this.circuitBreaker = requireNonNull(breaker);
return this;
}
/** Sets path to client SSL certificate/key PEM files */
- public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
+ @Override
+ public FeedClientBuilderImpl setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
this.certificateFile = certificatePemFile;
this.privateKeyFile = privateKeyPemFile;
return this;
}
/** Sets client SSL certificates/key */
- public FeedClientBuilder setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) {
+ @Override
+ public FeedClientBuilderImpl setCertificate(Collection<X509Certificate> certificate, PrivateKey privateKey) {
this.certificate = certificate;
this.privateKey = privateKey;
return this;
}
/** Sets client SSL certificate/key */
- public FeedClientBuilder setCertificate(X509Certificate certificate, PrivateKey privateKey) {
+ @Override
+ public FeedClientBuilderImpl setCertificate(X509Certificate certificate, PrivateKey privateKey) {
return setCertificate(Collections.singletonList(certificate), privateKey);
}
- public FeedClientBuilder setDryrun(boolean enabled) {
+ @Override
+ public FeedClientBuilderImpl setDryrun(boolean enabled) {
this.dryrun = enabled;
return this;
}
@@ -171,18 +175,21 @@ public class FeedClientBuilder {
* Overrides JVM default SSL truststore
* @param caCertificatesFile Path to PEM encoded file containing trusted certificates
*/
- public FeedClientBuilder setCaCertificatesFile(Path caCertificatesFile) {
+ @Override
+ public FeedClientBuilderImpl setCaCertificatesFile(Path caCertificatesFile) {
this.caCertificatesFile = caCertificatesFile;
return this;
}
/** Overrides JVM default SSL truststore */
- public FeedClientBuilder setCaCertificates(Collection<X509Certificate> caCertificates) {
+ @Override
+ public FeedClientBuilderImpl setCaCertificates(Collection<X509Certificate> caCertificates) {
this.caCertificates = caCertificates;
return this;
}
/** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
+ @Override
public FeedClient build() {
try {
validateConfiguration();
@@ -209,6 +216,9 @@ public class FeedClientBuilder {
}
private void validateConfiguration() {
+ if (endpoints == null) {
+ throw new IllegalArgumentException("At least one endpoint must be provided");
+ }
if (sslContext != null && (
certificateFile != null || caCertificatesFile != null || privateKeyFile != null ||
certificate != null || caCertificates != null || privateKey != null)) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
index cb5e35c79a5..b223fce7cab 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
@@ -1,5 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.HttpResponse;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index eb818ba1d48..3fd44596d63 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -1,6 +1,15 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
-
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.HttpResponse;
+import ai.vespa.feed.client.OperationParameters;
+import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.Result;
+import ai.vespa.feed.client.ResultException;
+import ai.vespa.feed.client.ResultParseException;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
@@ -33,11 +42,11 @@ class HttpFeedClient implements FeedClient {
private final RequestStrategy requestStrategy;
private final AtomicBoolean closed = new AtomicBoolean();
- HttpFeedClient(FeedClientBuilder builder) throws IOException {
+ HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
this(builder, new HttpRequestStrategy(builder));
}
- HttpFeedClient(FeedClientBuilder builder, RequestStrategy requestStrategy) {
+ HttpFeedClient(FeedClientBuilderImpl builder, RequestStrategy requestStrategy) {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
}
@@ -173,7 +182,7 @@ class HttpFeedClient implements FeedClient {
if (outcome == Outcome.vespaFailure)
throw new ResultException(documentId, message, trace);
- return new Result(toResultType(outcome), documentId, message, trace);
+ return new ResultImpl(toResultType(outcome), documentId, message, trace);
}
static String getPath(DocumentId documentId) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index 48defd71ea8..08b8ca08c61 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
import java.util.Map;
import java.util.function.Supplier;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index cf65a874f3b..6fec0029bc3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -1,8 +1,13 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.FeedClient;
import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedClient.RetryStrategy;
+import ai.vespa.feed.client.FeedException;
+import ai.vespa.feed.client.HttpResponse ;
+import ai.vespa.feed.client.OperationStats;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
@@ -62,11 +67,11 @@ class HttpRequestStrategy implements RequestStrategy {
return thread;
});
- HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
+ HttpRequestStrategy(FeedClientBuilderImpl builder) throws IOException {
this(builder, builder.dryrun ? new DryrunCluster() : new ApacheCluster(builder));
}
- HttpRequestStrategy(FeedClientBuilder builder, Cluster cluster) {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) {
this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java
index 9a97f7daa66..e3b6b594593 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/RequestStrategy.java
@@ -1,7 +1,10 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+import ai.vespa.feed.client.DocumentId;
import ai.vespa.feed.client.FeedClient.CircuitBreaker.State;
+import ai.vespa.feed.client.HttpResponse;
+import ai.vespa.feed.client.OperationStats;
import java.util.concurrent.CompletableFuture;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java
index 5ff3fd0a219..dabf76cba34 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Result.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ResultImpl.java
@@ -1,5 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.DocumentId;
+import ai.vespa.feed.client.Result;
import java.util.Optional;
@@ -9,29 +12,24 @@ import java.util.Optional;
* @author bjorncs
* @author jonmv
*/
-public class Result {
+public class ResultImpl implements Result {
private final Type type;
private final DocumentId documentId;
private final String resultMessage;
private final String traceMessage;
- Result(Type type, DocumentId documentId, String resultMessage, String traceMessage) {
+ ResultImpl(Type type, DocumentId documentId, String resultMessage, String traceMessage) {
this.type = type;
this.documentId = documentId;
this.resultMessage = resultMessage;
this.traceMessage = traceMessage;
}
- public enum Type {
- success,
- conditionNotMet
- }
-
- public Type type() { return type; }
- public DocumentId documentId() { return documentId; }
- public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); }
- public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); }
+ @Override public Type type() { return type; }
+ @Override public DocumentId documentId() { return documentId; }
+ @Override public Optional<String> resultMessage() { return Optional.ofNullable(resultMessage); }
+ @Override public Optional<String> traceMessage() { return Optional.ofNullable(traceMessage); }
@Override
public String toString() {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java
index f5e13eccd56..2ca4577abe6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/SslContextBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/SslContextBuilder.java
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
index 5137a18d923..1f9cf8e5155 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
@@ -18,7 +20,7 @@ public class StaticThrottler implements Throttler {
protected final long minInflight;
private final AtomicLong targetX10;
- public StaticThrottler(FeedClientBuilder builder) {
+ public StaticThrottler(FeedClientBuilderImpl builder) {
minInflight = 16L * builder.connectionsPerEndpoint * builder.endpoints.size();
maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java
index f2453c27879..700a6f6f805 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Throttler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/Throttler.java
@@ -1,5 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package ai.vespa.feed.client;
+package ai.vespa.feed.client.impl;
+
+import ai.vespa.feed.client.HttpResponse;
import java.util.concurrent.CompletableFuture;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java
deleted file mode 100644
index daab16a9ff2..00000000000
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/package-info.java
+++ /dev/null
@@ -1,9 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @author bjorncs
- */
-
-@PublicApi
-package ai.vespa.feed.client;
-
-import com.yahoo.api.annotations.PublicApi; \ No newline at end of file