summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java52
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java40
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java62
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java98
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java240
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java7
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java3
9 files changed, 340 insertions, 182 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index 2f15f468588..e3f726eaf11 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -74,7 +74,7 @@ public class CliClient {
private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException {
FeedClientBuilder builder = FeedClientBuilder.create(cliArgs.endpoint());
- cliArgs.connections().ifPresent(builder::setMaxConnections);
+ cliArgs.connections().ifPresent(builder::setConnectionsPerEndpoint);
cliArgs.maxStreamsPerConnection().ifPresent(builder::setMaxStreamPerConnection);
if (cliArgs.sslHostnameVerificationDisabled()) {
builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index 455a79060ee..2ac75a948d9 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -10,10 +10,22 @@ import java.util.concurrent.CompletableFuture;
*/
public interface FeedClient extends Closeable {
+ /** Send a document put with the given parameters, returning a future with the result of the operation. */
CompletableFuture<Result> put(DocumentId documentId, String documentJson, OperationParameters params);
+
+ /** Send a document update with the given parameters, returning a future with the result of the operation. */
CompletableFuture<Result> update(DocumentId documentId, String updateJson, OperationParameters params);
+
+ /** Send a document remove with the given parameters, returning a future with the result of the operation. */
CompletableFuture<Result> remove(DocumentId documentId, OperationParameters params);
+ /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */
+ void close(boolean graceful);
+
+ /** Initiates graceful shutdown. See {@link #close(boolean)}. */
+ default void close() { close(true); }
+
+ /** Controls what to retry, and how many times. */
interface RetryStrategy {
/** Whether to retry operations of the given type. */
@@ -24,10 +36,44 @@ public interface FeedClient extends Closeable {
}
+ /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */
+ interface CircuitBreaker {
+
+ /** Called by the client whenever a successful response is obtained. */
+ void success();
+
+ /** Called by the client whenever a transient or fatal error occurs. */
+ void failure();
+
+ /** The current state of the circuit breaker. */
+ State state();
+
+ enum State {
+
+ /** Circuit is closed: business as usual. */
+ CLOSED,
+
+ /** Circuit is half-open: something is wrong, perhaps it recovers? */
+ HALF_OPEN,
+
+ /** Circuit is open: we have given up. */
+ OPEN;
+
+ }
+
+ }
+
enum OperationType {
- put,
- update,
- remove;
+
+ /** A document put operation. This is idempotent. */
+ PUT,
+
+ /** A document update operation. This is idempotent if all its contained updates are. */
+ UPDATE,
+
+ /** A document remove operation. This is idempotent. */
+ REMOVE;
+
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index eaf84c67ac4..da575a7cf6d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -8,7 +8,11 @@ import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Clock;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -22,38 +26,45 @@ import static java.util.Objects.requireNonNull;
*/
public class FeedClientBuilder {
- FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { };
+ static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { };
- final URI endpoint;
+ final List<URI> endpoints;
final Map<String, Supplier<String>> requestHeaders = new HashMap<>();
SSLContext sslContext;
HostnameVerifier hostnameVerifier;
- int maxConnections = 4;
- int maxStreamsPerConnection = 1024;
+ int connectionsPerEndpoint = 4;
+ int maxStreamsPerConnection = 128;
FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy;
+ FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Clock.systemUTC(), Duration.ofSeconds(1), Duration.ofMinutes(10));
Path certificate;
Path privateKey;
Path caCertificates;
- Clock clock;
- public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); }
+ public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); }
- private FeedClientBuilder(URI endpoint) {
- requireNonNull(endpoint.getHost());
- this.endpoint = endpoint;
+ public static FeedClientBuilder create(List<URI> endpoints) { return new FeedClientBuilder(endpoints); }
+
+ private FeedClientBuilder(List<URI> endpoints) {
+ if (endpoints.isEmpty())
+ throw new IllegalArgumentException("At least one endpoint must be provided");
+
+ for (URI endpoint : endpoints)
+ requireNonNull(endpoint.getHost());
+
+ this.endpoints = new ArrayList<>(endpoints);
}
/**
- * Sets the maximum number of connections this client will use.
+ * Sets the number of connections this client will use per endpoint.
*
* A reasonable value here is a small multiple of the numbers of containers in the
* cluster to feed, so load can be balanced across these.
* In general, this value should be kept as low as possible, but poor connectivity
* between feeder and cluster may also warrant a higher number of connections.
*/
- public FeedClientBuilder setMaxConnections(int max) {
+ public FeedClientBuilder setConnectionsPerEndpoint(int max) {
if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max);
- this.maxConnections = max;
+ this.connectionsPerEndpoint = max;
return this;
}
@@ -97,6 +108,11 @@ public class FeedClientBuilder {
return this;
}
+ public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) {
+ this.circuitBreaker = requireNonNull(breaker);
+ return this;
+ }
+
public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) {
if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate");
this.certificate = certificatePemFile;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
new file mode 100644
index 00000000000..974d18418ec
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
@@ -0,0 +1,62 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.WARNING;
+
+/**
+ * Breaks the circuit when no successes have been recorded for a specified time.
+ */
+public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
+
+ private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
+
+ private final AtomicLong lastSuccessMillis = new AtomicLong(0); // Trigger if first response is a failure.
+ private final AtomicBoolean halfOpen = new AtomicBoolean(false);
+ private final AtomicBoolean open = new AtomicBoolean(false);
+ private final Clock clock;
+ private final long graceMillis;
+ private final long doomMillis;
+
+ GracePeriodCircuitBreaker(Clock clock, Duration grace, Duration doom) {
+ if (grace.isNegative())
+ throw new IllegalArgumentException("Grace delay must be non-negative");
+
+ if (doom.isNegative())
+ throw new IllegalArgumentException("Doom delay must be non-negative");
+
+ this.clock = requireNonNull(clock);
+ this.graceMillis = grace.toMillis();
+ this.doomMillis = doom.toMillis();
+ }
+
+ @Override
+ public void success() {
+ lastSuccessMillis.set(clock.millis());
+ if (halfOpen.compareAndSet(true, false))
+ log.log(INFO, "Circuit breaker is now closed");
+ }
+
+ @Override
+ public void failure() {
+ long nowMillis = clock.millis();
+ if (lastSuccessMillis.get() < nowMillis - doomMillis && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open");
+
+ if (lastSuccessMillis.get() < nowMillis - graceMillis && halfOpen.compareAndSet(false, true))
+ log.log(INFO, "Circuit breaker is now half-open");
+ }
+
+ @Override
+ public State state() {
+ return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 8a38e859ca4..644c387acd1 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -11,6 +11,7 @@ import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http2.config.H2Config;
+import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
@@ -19,9 +20,9 @@ import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,24 +45,32 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak;
*/
class HttpFeedClient implements FeedClient {
- private final URI endpoint;
private final Map<String, Supplier<String>> requestHeaders;
private final RequestStrategy requestStrategy;
- private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>();
- private final List<AtomicInteger> inflight = new ArrayList<>();
+ private final List<Endpoint> endpoints = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean();
HttpFeedClient(FeedClientBuilder builder) throws IOException {
- this.endpoint = builder.endpoint;
this.requestHeaders = new HashMap<>(builder.requestHeaders);
- this.requestStrategy = new HttpRequestStrategy(builder, Clock.systemUTC());
+ this.requestStrategy = new HttpRequestStrategy(builder);
+ for (URI endpoint : builder.endpoints)
+ for (int i = 0; i < builder.connectionsPerEndpoint; i++)
+ endpoints.add(new Endpoint(createHttpClient(builder), endpoint));
+ }
+
+ private static class Endpoint {
+
+ private final CloseableHttpAsyncClient client;
+ private final AtomicInteger inflight = new AtomicInteger(0);
+ private final URI url;
- for (int i = 0; i < builder.maxConnections; i++) {
- CloseableHttpAsyncClient client = createHttpClient(builder);
- client.start();
- httpClients.add(client);
- inflight.add(new AtomicInteger());
+ private Endpoint(CloseableHttpAsyncClient client, URI url) {
+ this.client = client;
+ this.url = url;
+
+ this.client.start();
}
+
}
private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
@@ -129,14 +138,37 @@ class HttpFeedClient implements FeedClient {
}
@Override
- public void close() throws IOException {
- if ( ! closed.getAndSet(true))
- for (CloseableHttpAsyncClient hc : httpClients)
- hc.close();
+ public void close(boolean graceful) {
+ closed.set(true);
+ if (graceful)
+ requestStrategy.await();
+
+ requestStrategy.destroy();
+ Throwable thrown = null;
+ for (Endpoint endpoint : endpoints)
+ try {
+ endpoint.client.close();
+ }
+ catch (Throwable t) {
+ if (thrown == null) thrown = t;
+ else thrown.addSuppressed(t);
+ }
+ if (thrown != null) throw new RuntimeException(thrown);
+ }
+
+ private void ensureOpen() {
+ if (requestStrategy.hasFailed())
+ close();
+
+ if (closed.get())
+ throw new IllegalStateException("Client is closed, no further operations may be sent");
}
private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
- SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params));
+ ensureOpen();
+
+ String path = operationPath(documentId, params).toString();
+ SimpleHttpRequest request = new SimpleHttpRequest(method, path);
requestHeaders.forEach((name, value) -> request.setHeader(name, value.get()));
if (operationJson != null)
request.setBody(operationJson, ContentType.APPLICATION_JSON);
@@ -144,10 +176,7 @@ class HttpFeedClient implements FeedClient {
return requestStrategy.enqueue(documentId, request, this::send)
.handle((response, thrown) -> {
if (thrown != null) {
- if (requestStrategy.hasFailed()) {
- try { close(); }
- catch (IOException exception) { thrown.addSuppressed(exception); }
- }
+ // TODO: What to do with exceptions here? Ex on 400, 401, 403, etc, and wrap and throw?
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
thrown.printStackTrace(new PrintStream(buffer));
return new Result(Result.Type.failure, documentId, buffer.toString(), null);
@@ -160,25 +189,28 @@ class HttpFeedClient implements FeedClient {
private void send(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
int index = 0;
int min = Integer.MAX_VALUE;
- for (int i = 0; i < httpClients.size(); i++)
- if (inflight.get(i).get() < min) {
- min = inflight.get(i).get();
+ for (int i = 0; i < endpoints.size(); i++)
+ if (endpoints.get(i).inflight.get() < min) {
index = i;
+ min = endpoints.get(i).inflight.get();
}
- inflight.get(index).incrementAndGet();
+ Endpoint endpoint = endpoints.get(index);
+ endpoint.inflight.incrementAndGet();
try {
- httpClients.get(index).execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
+ request.setScheme(endpoint.url.getScheme());
+ request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort()));
+ endpoint.client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
}
catch (Throwable thrown) {
vessel.completeExceptionally(thrown);
}
- vessel.thenRun(inflight.get(index)::decrementAndGet);
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
}
static Result toResult(SimpleHttpResponse response, DocumentId documentId) {
@@ -214,8 +246,8 @@ class HttpFeedClient implements FeedClient {
return path;
}
- static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) {
- URIBuilder url = new URIBuilder(endpoint);
+ static URI operationPath(DocumentId documentId, OperationParameters params) {
+ URIBuilder url = new URIBuilder();
url.setPathSegments(toPath(documentId));
if (params.createIfNonExistent()) url.addParameter("create", "true");
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index d0d67d65446..a3a29412254 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -1,26 +1,29 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
import ai.vespa.feed.client.FeedClient.RetryStrategy;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.INFO;
+// TODO: update doc
/**
* Controls request execution and retries:
* <ul>
@@ -31,57 +34,73 @@ import static java.util.logging.Level.INFO;
*
* @author jonmv
*/
-class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
+class HttpRequestStrategy implements RequestStrategy {
private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
- private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
- private final Object monitor = new Object();
- private final Clock clock;
- private final RetryStrategy wrapped;
- private final Thread delayer = new Thread(this::drainDelayed, "feed-client-retry-delayer");
- private final BlockingQueue<CompletableFuture<Void>> delayed = new LinkedBlockingQueue<>();
+ private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>();
+ private final RetryStrategy strategy;
+ private final CircuitBreaker breaker;
+ private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final long maxInflight;
private final long minInflight;
- private double targetInflight;
- private long inflight = 0;
- private long consecutiveSuccesses = 0;
- private Instant lastSuccess;
- private boolean failed = false;
- private boolean closed = false;
-
- HttpRequestStrategy(FeedClientBuilder builder, Clock clock) {
- this.wrapped = builder.retryStrategy;
- this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
- this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection);
- this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
- this.clock = clock;
- this.lastSuccess = clock.instant();
- this.delayer.start();
- }
-
- private void drainDelayed() {
- try {
- while (true) {
- do delayed.take().complete(null);
- while ( ! hasFailed());
+ private final AtomicLong targetInflightX10; // 10x target, so we can increment one every tenth success.
+ private final AtomicLong inflight = new AtomicLong(0);
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+ private final AtomicLong delayedCount = new AtomicLong(0);
+ private final AtomicLong retries = new AtomicLong(0);
+
+ HttpRequestStrategy(FeedClientBuilder builder) {
+ this.strategy = builder.retryStrategy;
+ this.breaker = builder.circuitBreaker;
+ this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
+ this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection);
+ this.targetInflightX10 = new AtomicLong(10 * (long) (Math.sqrt(minInflight) * Math.sqrt(maxInflight)));
+ new Thread(this::dispatch, "feed-client-dispatcher").start();
+ }
- Thread.sleep(1000);
+ private void dispatch() {
+ try {
+ while ( ! destroyed.get()) {
+ CircuitBreaker.State state = breaker.state();
+ if (state == OPEN) destroy();
+ else while ( ! isInExcess())
+ if ( ! poll() || breaker.state() == HALF_OPEN) break;
+
+ // Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
+ Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10);
}
}
catch (InterruptedException e) {
- delayed.forEach(action -> action.cancel(true));
+ destroy();
}
}
+ private void offer(Runnable task) {
+ delayedCount.incrementAndGet();
+ queue.offer(task);
+ }
+
+ private boolean poll() {
+ Runnable task = queue.poll();
+ if (task == null) return false;
+ delayedCount.decrementAndGet();
+ task.run();
+ return true;
+ }
+
+ private boolean isInExcess() {
+ return inflight.get() - delayedCount.get() > targetInflight();
+ }
+
private boolean retry(SimpleHttpRequest request, int attempt) {
- if (attempt >= wrapped.retries())
+ if (attempt >= strategy.retries())
return false;
switch (request.getMethod().toUpperCase()) {
- case "POST": return wrapped.retry(FeedClient.OperationType.put);
- case "PUT": return wrapped.retry(FeedClient.OperationType.update);
- case "DELETE": return wrapped.retry(FeedClient.OperationType.remove);
+ case "POST": return strategy.retry(FeedClient.OperationType.PUT);
+ case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE);
+ case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE);
default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
}
}
@@ -91,8 +110,8 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
* or the user has turned off retries for this type of operation.
*/
private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
- failure();
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+ breaker.failure();
+ log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request);
if ( ! (thrown instanceof IOException))
return false;
@@ -100,74 +119,69 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
return retry(request, attempt);
}
- void success() {
- Instant now = clock.instant();
- synchronized (monitor) {
- ++consecutiveSuccesses;
- lastSuccess = now;
- targetInflight = min(targetInflight + 0.1, maxInflight);
- }
+ private void incrementTargetInflight() {
+ targetInflightX10.incrementAndGet();
}
- void failure() {
- Instant threshold = clock.instant().minusSeconds(300);
- synchronized (monitor) {
- consecutiveSuccesses = 0;
- if (lastSuccess.isBefore(threshold))
- failed = true;
- }
+ private void decreaseTargetInflight() {
+ targetInflightX10.set(max((inflight.get() - delayedCount.get()) * 9, minInflight * 10));
+ }
+
+ private long targetInflight() {
+ return min(targetInflightX10.get() / 10, maxInflight);
}
/** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) {
if (response.getCode() / 100 == 2) {
- success();
+ breaker.success();
+ incrementTargetInflight();
return false;
}
- if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
- synchronized (monitor) {
- targetInflight = max(inflight * 0.9, minInflight);
- }
- log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
- " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+ log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() +
+ ") on attempt " + attempt + " at " + request);
+ if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight.
+ decreaseTargetInflight();
return true;
}
- log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt +
- " at " + request + ", " + consecutiveSuccesses + " successes since last error");
-
- failure();
- if (response.getCode() != 500 && response.getCode() != 502)
- return false;
+ breaker.failure();
+ if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors.
+ return retry(request, attempt);
- return retry(request, attempt); // Hopefully temporary errors.
+ return false;
}
- // Must hold lock.
private void acquireSlot() {
try {
- while (inflight >= targetInflight)
- monitor.wait();
+ while (inflight.get() >= targetInflight())
+ Thread.sleep(1);
- ++inflight;
+ inflight.incrementAndGet();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
- // Must hold lock.
private void releaseSlot() {
- for (long i = --inflight; i < targetInflight; i++)
- monitor.notify();
+ inflight.decrementAndGet();
}
@Override
public boolean hasFailed() {
- synchronized (monitor) {
- return failed;
+ return breaker.state() == OPEN;
+ }
+
+ public void await() {
+ try {
+ while (inflight.get() > 0)
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
@@ -176,33 +190,24 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch) {
CompletableFuture<SimpleHttpResponse> result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries.
CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client.
- CompletableFuture<Void> blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete.
-
- // Get the previous inflight operation for this doc-id, or acquire a send slot.
- CompletableFuture<Void> previous;
- synchronized (monitor) {
- previous = inflightById.put(documentId, blocker);
- if (previous == null)
- acquireSlot();
+ CompletableFuture<?> previous = inflightById.put(documentId, result);
+ if (destroyed.get()) {
+ result.cancel(true);
+ return result;
}
- if (previous == null) // Send immediately if none inflight ...
+
+ if (previous == null) {
+ acquireSlot();
dispatch.accept(request, vessel);
- else // ... or send when the previous inflight is done.
- previous.thenRun(() -> dispatch.accept(request, vessel));
+ }
+ else
+ previous.whenComplete((__, ___) -> offer(() -> dispatch.accept(request, vessel)));
handleAttempt(vessel, dispatch, request, result, 1);
- result.thenRun(() -> {
- CompletableFuture<Void> current;
- synchronized (monitor) {
- current = inflightById.get(documentId);
- if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ...
- releaseSlot();
- inflightById.put(documentId, null);
- }
- }
- if (current != blocker) // ... or trigger sending the next enqueued operation.
- blocker.complete(null);
+ result.whenComplete((__, ___) -> {
+ if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null)
+ releaseSlot();
});
return result;
@@ -215,33 +220,24 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
+ retries.incrementAndGet();
+ CircuitBreaker.State state = breaker.state();
CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
- boolean hasFailed = hasFailed();
- if (hasFailed)
- delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry)));
- else
- dispatch.accept(request, retry);
- handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1));
- return;
+ offer(() -> dispatch.accept(request, retry));
+ handleAttempt(retry, dispatch, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
}
-
// ... or accept the outcome and mark the operation as complete.
- if (thrown == null) result.complete(response);
- else result.completeExceptionally(thrown);
+ else {
+ if (thrown == null) result.complete(response);
+ else result.completeExceptionally(thrown);
+ }
});
}
@Override
- public void close() {
- synchronized (monitor) {
- if (closed)
- return;
-
- closed = true;
- }
- delayer.interrupt();
- try { delayer.join(); }
- catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+ public void destroy() {
+ if ( ! destroyed.getAndSet(true))
+ inflightById.values().forEach(result -> result.cancel(true));
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
index 17162f19d3f..99d05a4bae8 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
@@ -17,9 +17,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import static ai.vespa.feed.client.FeedClient.OperationType.put;
-import static ai.vespa.feed.client.FeedClient.OperationType.remove;
-import static ai.vespa.feed.client.FeedClient.OperationType.update;
+import static ai.vespa.feed.client.FeedClient.OperationType.PUT;
+import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE;
+import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE;
import static com.fasterxml.jackson.core.JsonToken.START_OBJECT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
@@ -187,9 +187,9 @@ public class JsonStreamFeeder implements Closeable {
case FIELD_NAME:
switch (parser.getText()) {
case "id":
- case "put": type = put; id = readId(); break;
- case "update": type = update; id = readId(); break;
- case "remove": type = remove; id = readId(); break;
+ case "put": type = PUT; id = readId(); break;
+ case "update": type = UPDATE; id = readId(); break;
+ case "remove": type = REMOVE; id = readId(); break;
case "condition": parameters = parameters.testAndSetCondition(readString()); break;
case "create": parameters = parameters.createIfNonExistent(readBoolean()); break;
case "fields": {
@@ -230,9 +230,9 @@ public class JsonStreamFeeder implements Closeable {
}
switch (type) {
- case put: return client.put (id, payload, parameters);
- case update: return client.update(id, payload, parameters);
- case remove: return client.remove(id, parameters);
+ case PUT: return client.put (id, payload, parameters);
+ case UPDATE: return client.update(id, payload, parameters);
+ case REMOVE: return client.remove(id, parameters);
default: throw new IllegalStateException("Unexpected operation type '" + type + "'");
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index 1787d8d65c6..c3bb4573fd4 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -4,6 +4,7 @@ package ai.vespa.feed.client;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
@@ -17,6 +18,12 @@ public interface RequestStrategy {
/** Whether this has failed fatally, and we should cease sending further operations. */
boolean hasFailed();
+ /** Forcibly terminates this, causing all inflight operations to complete immediately. */
+ void destroy();
+
+ /** Wait for all inflight requests to complete. */
+ void await();
+
/** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch);
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
index 8ef8ae57f5e..8db0b8f2d43 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
@@ -57,9 +57,8 @@ class JsonStreamFeederTest {
}
@Override
- public void close() throws IOException {
+ public void close(boolean graceful) { }
- }
}).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6.
assertEquals(docs + 1, ids.size());
}