aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java8
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java30
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java11
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java41
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java45
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java17
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java106
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java21
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java30
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java18
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java51
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java159
13 files changed, 399 insertions, 140 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java
index 99c891696f5..bd1309d3bca 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java
@@ -48,9 +48,7 @@ public class BenchmarkingCluster implements Cluster {
public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
requests.incrementAndGet();
long startNanos = System.nanoTime();
- if (timeOfFirstDispatch.get() == 0) {
- timeOfFirstDispatch.set(startNanos);
- }
+ timeOfFirstDispatch.compareAndSet(0, startNanos);
delegate.dispatch(request, vessel);
vessel.whenCompleteAsync((response, thrown) -> {
results++;
@@ -94,8 +92,8 @@ public class BenchmarkingCluster implements Cluster {
if (responsesByCode[code] > 0)
responses.put(code, responsesByCode[code]);
- double duration = (System.nanoTime() - timeOfFirstDispatch.get())*1e-9;
- return new OperationStats(duration, requests, responses, exceptions,
+ double duration = (System.nanoTime() - timeOfFirstDispatch.get()) * 1e-9;
+ return new OperationStats(duration, requests, responses, exceptions,
requests - results, throttler.targetInflight(),
this.responses == 0 ? -1 : totalLatencyMillis / this.responses,
this.responses == 0 ? -1 : minLatencyMillis,
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 951a1776b6f..3344a372734 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -28,13 +28,13 @@ public class DynamicThrottler extends StaticThrottler {
public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
- targetInflight = new AtomicLong(8 * minInflight);
+ targetInflight = new AtomicLong(minInflight);
}
@Override
public void sent(long __, CompletableFuture<HttpResponse> ___) {
- double currentInflight = targetInflight.get();
- if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
+ double currentInflight = targetInflight();
+ if (++sent * sent * sent < 1e3 * currentInflight * currentInflight)
return;
sent = 0;
@@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler {
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight)))
- / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ / log(256)); // 512 (server max streams per connection) / 2 (our min per connection)
throughputs[index] = currentThroughput;
// Loop over throughput measurements and pick the one which optimises throughput and latency.
- double choice = currentInflight;
+ double best = currentInflight;
double max = -1;
- for (int i = throughputs.length; i-- > 0; ) {
+ int j = -1, k = -1, choice = 0;
+ double s = 0;
+ for (int i = 0; i < throughputs.length; i++) {
if (throughputs[i] == 0) continue; // Skip unknown values.
double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length);
double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight).
if (objective > max) {
max = objective;
- choice = inflight;
+ best = inflight;
+ choice = i;
}
+ // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness.
+ if (j != -1) {
+ double t = throughputs[j];
+ if (k != -1) throughputs[j] = (18 * t + throughputs[i] + s) / 20;
+ s = t;
+ }
+ k = j;
+ j = i;
}
- long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase.
+ long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase.
+ // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration.
+ if (choice == j && choice + 1 < throughputs.length)
+ target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length));
targetInflight.set(max(minInflight, min(maxInflight, target)));
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
index c271ac356e9..d5eab8e17af 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java
@@ -18,6 +18,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto;
@@ -57,10 +58,9 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
Compression compression = auto;
URI proxy;
Duration connectionTtl = Duration.ZERO;
+ LongSupplier nanoClock = System::nanoTime;
-
- public FeedClientBuilderImpl() {
- }
+ public FeedClientBuilderImpl() { }
FeedClientBuilderImpl(List<URI> endpoints) {
this();
@@ -252,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder {
return this;
}
+ FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) {
+ this.nanoClock = requireNonNull(nanoClock);
+ return this;
+ }
+
/** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */
@Override
public FeedClient build() {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
index 1ea2089c0eb..cec7106403e 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
@@ -12,7 +12,6 @@ import java.util.function.LongSupplier;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@@ -24,22 +23,22 @@ import static java.util.logging.Level.WARNING;
public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
- private static final long NEVER = 1L << 60;
- private final AtomicLong failingSinceMillis = new AtomicLong(NEVER);
private final AtomicBoolean halfOpen = new AtomicBoolean(false);
private final AtomicBoolean open = new AtomicBoolean(false);
- private final LongSupplier clock;
+ private final LongSupplier nanoClock;
+ private final long never;
+ private final AtomicLong failingSinceNanos;
private final AtomicReference<String> detail = new AtomicReference<>();
- private final long graceMillis;
- private final long doomMillis;
+ private final long graceNanos;
+ private final long doomNanos;
/**
* Creates a new circuit breaker with the given grace periods.
* @param grace the period of consecutive failures before state changes to half-open.
*/
public GracePeriodCircuitBreaker(Duration grace) {
- this(System::currentTimeMillis, grace, null);
+ this(System::nanoTime, grace, null);
}
/**
@@ -48,23 +47,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
* @param doom the period of consecutive failures before shutting down.
*/
public GracePeriodCircuitBreaker(Duration grace, Duration doom) {
- this(System::currentTimeMillis, grace, doom);
+ this(System::nanoTime, grace, doom);
if (doom.isNegative())
throw new IllegalArgumentException("Doom delay must be non-negative");
}
- GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) {
+ GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) {
if (grace.isNegative())
throw new IllegalArgumentException("Grace delay must be non-negative");
- this.clock = requireNonNull(clock);
- this.graceMillis = grace.toMillis();
- this.doomMillis = doom == null ? -1 : doom.toMillis();
+ this.nanoClock = requireNonNull(nanoClock);
+ this.never = nanoClock.getAsLong() + (1L << 60);
+ this.graceNanos = grace.toNanos();
+ this.doomNanos = doom == null ? -1 : doom.toNanos();
+ this.failingSinceNanos = new AtomicLong(never);
}
@Override
public void success() {
- failingSinceMillis.set(NEVER);
+ failingSinceNanos.set(never);
if ( ! open.get() && halfOpen.compareAndSet(true, false))
log.log(INFO, "Circuit breaker is now closed, after a request was successful");
}
@@ -80,21 +81,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
}
private void failure(String detail) {
- if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong()))
+ if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong()))
this.detail.set(detail);
}
@Override
public State state() {
- long failingMillis = clock.getAsLong() - failingSinceMillis.get();
- if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
+ long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get();
+ if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true))
log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " +
- "last " + failingMillis + "ms. The server will be pinged to see if it recovers" +
- (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") +
+ "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" +
+ (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") +
". First failure was '" + detail.get() + "'.");
- if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true))
- log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " +
+ if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " +
"and this client will give up and abort its remaining feed operations.");
return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index a30cfd5ec39..8ee281fb38d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -17,13 +17,13 @@ import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import static ai.vespa.feed.client.OperationParameters.empty;
@@ -45,6 +46,7 @@ import static java.util.Objects.requireNonNull;
*/
class HttpFeedClient implements FeedClient {
+ private static final Duration maxTimeout = Duration.ofMinutes(15);
private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder()
.streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build())
.build();
@@ -53,20 +55,23 @@ class HttpFeedClient implements FeedClient {
private final RequestStrategy requestStrategy;
private final AtomicBoolean closed = new AtomicBoolean();
private final boolean speedTest;
+ private final LongSupplier nanoClock;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
- this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder));
+ this(builder,
+ builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) {
- this(builder, cluster, new HttpRequestStrategy(builder, cluster));
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
+ this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) {
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
- verifyConnection(builder, cluster);
+ this.nanoClock = builder.nanoClock;
+ verifyConnection(builder, clusterFactory);
}
@Override
@@ -108,10 +113,12 @@ class HttpFeedClient implements FeedClient {
throw new IllegalStateException("Client is closed");
HttpRequest request = new HttpRequest(method,
- getPath(documentId) + getQuery(params, speedTest),
+ getPath(documentId),
+ getQuery(params, speedTest),
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
- params.timeout().orElse(null));
+ params.timeout().orElse(maxTimeout),
+ nanoClock);
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
@@ -129,14 +136,16 @@ class HttpFeedClient implements FeedClient {
return promise;
}
- private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) {
+ private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
Instant start = Instant.now();
- try {
+ try (Cluster cluster = clusterFactory.create()) {
HttpRequest request = new HttpRequest("POST",
- getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
+ getPath(DocumentId.of("feeder", "handshake", "dummy")),
+ getQuery(empty(), true),
requestHeaders,
null,
- Duration.ofSeconds(15));
+ Duration.ofSeconds(15),
+ nanoClock);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
@@ -219,13 +228,13 @@ class HttpFeedClient implements FeedClient {
throw new ResultParseException(documentId,
"Expected 'trace' to be an array, but got '" + parser.currentToken() + "' in: " +
new String(json, UTF_8));
- int start = (int) parser.getTokenLocation().getByteOffset();
+ int start = (int) parser.currentTokenLocation().getByteOffset();
int depth = 1;
while (depth > 0) switch (parser.nextToken()) {
case START_ARRAY: ++depth; break;
case END_ARRAY: --depth; break;
}
- int end = (int) parser.getTokenLocation().getByteOffset() + 1;
+ int end = (int) parser.currentTokenLocation().getByteOffset() + 1;
trace = new String(json, start, end - start, UTF_8);
break;
default:
@@ -307,11 +316,17 @@ class HttpFeedClient implements FeedClient {
StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue("");
if (params.createIfNonExistent()) query.add("create=true");
params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition)));
- params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms"));
params.route().ifPresent(route -> query.add("route=" + encode(route)));
params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel));
if (speedTest) query.add("dryRun=true");
return query.toString();
}
+ /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
+ interface ClusterFactory {
+
+ Cluster create() throws IOException;
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index 6de3f034f22..22f6eaa75a4 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -3,30 +3,37 @@ package ai.vespa.feed.client.impl;
import java.time.Duration;
import java.util.Map;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
class HttpRequest {
private final String method;
private final String path;
+ private final String query;
private final Map<String, Supplier<String>> headers;
private final byte[] body;
private final Duration timeout;
+ private final long deadlineNanos;
+ private final LongSupplier nanoClock;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) {
+ public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) {
this.method = method;
this.path = path;
+ this.query = query;
this.headers = headers;
this.body = body;
+ this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos();
this.timeout = timeout;
+ this.nanoClock = nanoClock;
}
public String method() {
return method;
}
- public String path() {
- return path;
+ public String pathAndQuery() {
+ return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms";
}
public Map<String, Supplier<String>> headers() {
@@ -37,6 +44,10 @@ class HttpRequest {
return body;
}
+ public Duration timeLeft() {
+ return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong());
+ }
+
public Duration timeout() {
return timeout;
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index 8f0327a1738..dc902297d6d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -8,19 +8,25 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,6 +37,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINER;
import static java.util.logging.Level.FINEST;
+import static java.util.logging.Level.INFO;
+import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
/**
@@ -65,10 +73,14 @@ class HttpRequestStrategy implements RequestStrategy {
thread.setDaemon(true);
return thread;
});
+ // TODO jonmv: remove if this has no effect
+ private final ResettableCluster resettableCluster;
+ private final AtomicBoolean reset = new AtomicBoolean(false);
- HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
this.throttler = new DynamicThrottler(builder);
- this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
+ this.resettableCluster = new ResettableCluster(clusterFactory);
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
@@ -91,12 +103,18 @@ class HttpRequestStrategy implements RequestStrategy {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
+
+ if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true))
+ resettableCluster.reset();
+ else if (breaker.state() == CLOSED)
+ reset.set(false);
+
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1);
}
}
catch (Throwable t) {
- log.log(WARNING, "Dispatch thread threw; shutting down", t);
+ log.log(SEVERE, "Dispatch thread threw; shutting down", t);
}
destroy();
}
@@ -119,7 +137,7 @@ class HttpRequestStrategy implements RequestStrategy {
}
private boolean retry(HttpRequest request, int attempt) {
- if (attempt > strategy.retries())
+ if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0)
return false;
switch (request.method().toUpperCase()) {
@@ -137,9 +155,8 @@ class HttpRequestStrategy implements RequestStrategy {
private boolean retry(HttpRequest request, Throwable thrown, int attempt) {
breaker.failure(thrown);
if ( (thrown instanceof IOException) // General IO problems.
-
// Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server
- || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed"))
+ || (thrown instanceof IllegalStateException && "session closed".equals(thrown.getMessage()))
) {
log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request);
return retry(request, attempt);
@@ -149,7 +166,7 @@ class HttpRequestStrategy implements RequestStrategy {
return false;
}
- /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */
+ /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */
private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) {
logResponse(FINEST, response, request, attempt);
@@ -170,6 +187,10 @@ class HttpRequestStrategy implements RequestStrategy {
return retry(request, attempt);
}
+ if (response.code() >= 500) { // Server errors may indicate something wrong with the server.
+ breaker.failure(response);
+ }
+
return false;
}
@@ -272,7 +293,8 @@ class HttpRequestStrategy implements RequestStrategy {
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
- private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) {
+ private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request,
+ RetriableFuture<HttpResponse> result, int attempt) {
vessel.whenCompleteAsync((response, thrown) -> {
result.set(response, thrown);
// Retry the operation if it failed with a transient error ...
@@ -305,4 +327,72 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
+ /**
+ * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes
+ * renders a client instance permanently unusable. If this is the case, replacing the client altogether
+ * should allow the feeder to start working again, when it wouldn't otherwise be able to.
+ */
+ private static class ResettableCluster implements Cluster {
+
+ private final Object monitor = new Object();
+ private final ClusterFactory clusterFactory;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private AtomicLong inflight = new AtomicLong(0);
+ private Cluster delegate;
+
+ ResettableCluster(ClusterFactory clusterFactory) throws IOException {
+ this.clusterFactory = clusterFactory;
+ this.delegate = clusterFactory.create();
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ synchronized (monitor) {
+ AtomicLong usedCounter = inflight;
+ usedCounter.incrementAndGet();
+ Cluster usedCluster = delegate;
+ usedCluster.dispatch(request, vessel);
+ vessel.whenCompleteAsync((__, ___) -> {
+ synchronized (monitor) {
+ if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) {
+ log.log(INFO, "Closing old HTTP client");
+ usedCluster.close();
+ }
+ }
+ },
+ executor);
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (monitor) {
+ delegate.close();
+ executor.shutdown();
+ try {
+ if ( ! executor.awaitTermination(1, TimeUnit.MINUTES))
+ log.log(WARNING, "Failed shutting down HTTP client within 1 minute");
+ }
+ catch (InterruptedException e) {
+ log.log(WARNING, "Interrupted waiting for HTTP client to shut down");
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public OperationStats stats() {
+ return delegate.stats();
+ }
+
+ void reset() throws IOException {
+ synchronized (monitor) {
+ log.log(INFO, "Replacing underlying HTTP client to attempt recovery");
+ delegate = clusterFactory.create();
+ inflight = new AtomicLong(0);
+ }
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index df010a167f6..28e5b5d0a21 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -45,8 +45,11 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
@@ -61,6 +64,7 @@ import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
* @author bjorncs
*/
class JettyCluster implements Cluster {
+ private static final Logger log = Logger.getLogger(JettyCluster.class.getName());
// Socket timeout must be longer than the longest feasible response timeout
private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15);
@@ -81,9 +85,12 @@ class JettyCluster implements Cluster {
Endpoint endpoint = findLeastBusyEndpoint(endpoints);
try {
endpoint.inflight.incrementAndGet();
- long reqTimeoutMillis = req.timeout() != null
- ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
- Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
+ long reqTimeoutMillis = req.timeLeft().toMillis();
+ if (reqTimeoutMillis <= 0) {
+ vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'"));
+ return;
+ }
+ Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.pathAndQuery()))
.version(HttpVersion.HTTP_2)
.method(HttpMethod.fromString(req.method()))
.headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get())))
@@ -104,15 +111,23 @@ class JettyCluster implements Cluster {
}
jettyReq.body(new BytesRequestContent(APPLICATION_JSON.asString(), bytes));
}
+ log.log(Level.FINER, () ->
+ String.format("Dispatching request %s (%s)", req, System.identityHashCode(vessel)));
jettyReq.send(new BufferingResponseListener() {
@Override
public void onComplete(Result result) {
+ log.log(Level.FINER, () ->
+ String.format("Completed request %s (%s): %s",
+ req, System.identityHashCode(vessel),
+ result.isFailed()
+ ? result.getFailure().toString() : result.getResponse().getStatus()));
endpoint.inflight.decrementAndGet();
if (result.isFailed()) vessel.completeExceptionally(result.getFailure());
else vessel.complete(new JettyResponse(result.getResponse(), getContent()));
}
});
} catch (Exception e) {
+ log.log(Level.FINE, e, () -> "Failed to dispatch request: " + e.getMessage());
endpoint.inflight.decrementAndGet();
vessel.completeExceptionally(e);
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
index 9010b0a7ad8..f0ee434e87c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
@@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler {
public StaticThrottler(FeedClientBuilderImpl builder) {
minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size();
- maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
+ maxInflight = 256 * minInflight; // 512 max streams per connection on the server side.
targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
new file mode 100644
index 00000000000..cea5d32a55a
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
@@ -0,0 +1,30 @@
+package ai.vespa.feed.client.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class DynamicThrottlerTest {
+
+ @Test
+ void testThrottler() {
+ DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080"))));
+ assertEquals(16, throttler.targetInflight());
+
+ for (int i = 0; i < 65; i++) {
+ throttler.sent(1, null);
+ throttler.success();
+ }
+ assertEquals(18, throttler.targetInflight());
+
+ throttler.throttled(34);
+ assertEquals(17, throttler.targetInflight());
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
index f5ca70fe291..52b8dcc5884 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
@@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest {
@Test
void testCircuitBreaker() {
- AtomicLong now = new AtomicLong(0);
- long SECOND = 1000;
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
+ AtomicLong nowNanos = new AtomicLong(0);
+ long SECOND = 1_000_000_000L;
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
Throwable error = new Error();
assertEquals(CLOSED, breaker.state(), "Initial state is closed");
- now.addAndGet(100 * SECOND);
+ nowNanos.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a success");
- now.addAndGet(100 * SECOND);
+ nowNanos.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed some time after a success");
breaker.failure(error);
assertEquals(CLOSED, breaker.state(), "State is closed right after a failure");
- now.addAndGet(SECOND);
+ nowNanos.addAndGet(SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed");
- now.addAndGet(1);
+ nowNanos.addAndGet(1);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a new success");
breaker.failure(error);
- now.addAndGet(60 * SECOND);
+ nowNanos.addAndGet(60 * SECOND);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed");
- now.addAndGet(1);
+ nowNanos.addAndGet(1);
assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");
breaker.success();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 28bde16f457..9afaeed8062 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result;
import ai.vespa.feed.client.ResultException;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
@@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpFeedClientTest {
@Test
- void testFeeding() throws ExecutionException, InterruptedException {
+ void testFeeding() throws ExecutionException, InterruptedException, IOException {
DocumentId id = DocumentId.of("ns", "type", "0");
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@@ -42,16 +43,18 @@ class HttpFeedClientTest {
@Override public void await() { throw new UnsupportedOperationException(); }
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
- FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
- new DryrunCluster(),
+ FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setDryrun(true)
+ .setNanoClock(() -> 0),
+ () -> new DryrunCluster(),
new MockRequestStrategy());
// Update is a PUT, and 200 OK is a success.
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0",
- request.path());
+ assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms",
+ request.pathAndQuery());
assertEquals("PUT", request.method());
assertEquals("json", new String(request.body(), UTF_8));
@@ -82,8 +85,8 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0?tracelevel=1",
- request.path());
+ assertEquals("/document/v1/ns/type/docid/0?tracelevel=1&timeout=900000ms",
+ request.pathAndQuery());
assertEquals("DELETE", request.method());
assertNull(request.body());
@@ -144,8 +147,8 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route",
- request.path());
+ assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&route=route&timeout=5000ms",
+ request.pathAndQuery());
assertEquals("json", new String(request.body(), UTF_8));
HttpResponse response = HttpResponse.of(502,
@@ -174,7 +177,7 @@ class HttpFeedClientTest {
.timeout(Duration.ofSeconds(5)))
.get());
assertTrue(expected.getCause() instanceof ResultException);
- assertEquals("Ooops! ... I did it again.", expected.getCause().getMessage());
+ assertEquals("(id:ns:type::0) Ooops! ... I did it again.", expected.getCause().getMessage());
assertEquals("[ { \"message\": \"I played with your heart. Got lost in the game.\" } ]", ((ResultException) expected.getCause()).getTrace().get());
@@ -182,8 +185,8 @@ class HttpFeedClientTest {
dispatch.set((documentId, request) -> {
try {
assertEquals(id, documentId);
- assertEquals("/document/v1/ns/type/docid/0",
- request.path());
+ assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms",
+ request.pathAndQuery());
assertEquals("json", new String(request.body(), UTF_8));
HttpResponse response = HttpResponse.of(500,
@@ -207,14 +210,14 @@ class HttpFeedClientTest {
"json",
OperationParameters.empty())
.get());
- assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
+ assertEquals("(id:ns:type::0) Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage());
}
@Test
- void testHandshake() {
+ void testHandshake() throws IOException {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
- () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
+ () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
String message = exception.getMessage();
assertTrue(message.startsWith("failed handshake with server after "), message);
assertTrue(message.contains("java.net.UnknownHostException"), message);
@@ -226,7 +229,7 @@ class HttpFeedClientTest {
try {
assertNull(request.body());
assertEquals("POST", request.method());
- assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true", request.path());
+ assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true&timeout=15000ms", request.pathAndQuery());
vessel.complete(response.get());
}
catch (Throwable t) {
@@ -237,20 +240,24 @@ class HttpFeedClientTest {
// Old server, and speed-test.
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
- () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
- cluster,
+ () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0)
+ .setSpeedTest(true),
+ () -> cluster,
null))
.getMessage());
// Old server.
- new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0),
+ () -> cluster,
null);
// New server.
response.set(okResponse);
- new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))
+ .setNanoClock(() -> 0),
+ () -> cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 54fab9b859b..35d6433ffc5 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -17,11 +17,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,23 +35,24 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() {
+ void testConcurrency() throws IOException {
int documents = 1 << 16;
- HttpRequest request = new HttpRequest("PUT", "/", null, null, null);
+ HttpRequest request = new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS);
- HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setConnectionsPerEndpoint(1 << 10)
- .setMaxStreamPerConnection(1 << 12),
- cluster);
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
+ () -> cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -80,37 +83,40 @@ class HttpRequestStrategyTest {
assertEquals(2 * documents, stats.bytesReceived());
}
- @Test
- void testRetries() throws ExecutionException, InterruptedException {
- int minStreams = 16; // Hard limit for minimum number of streams per connection.
+ @Test()
+ void testRetries() throws ExecutionException, InterruptedException, IOException {
+ int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong nowNanos = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setRetryStrategy(new FeedClient.RetryStrategy() {
- @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
- @Override public int retries() { return 1; }
- })
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1)
- .setMaxStreamPerConnection(minStreams),
- cluster);
+ .setRetryStrategy(new FeedClient.RetryStrategy() {
+ @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
+ @Override public int retries() { return 1; }
+ })
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1)
+ .setMaxStreamPerConnection(minStreams),
+ () -> cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(180), nowNanos::get);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertTrue(expected.getCause() instanceof FeedException);
- assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
+ assertInstanceOf(FeedException.class, expected.getCause());
+ assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
// IOException is retried.
- cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me")));
+ cluster.expect((__, vessel) -> {
+ nowNanos.addAndGet(200_000_000L); // Exceed grace period.
+ vessel.completeExceptionally(new IOException("retry me"));
+ });
expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
@@ -123,7 +129,7 @@ class HttpRequestStrategyTest {
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
- now.set(2000);
+ nowNanos.set(2_000_000_000L);
HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
@@ -140,11 +146,11 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
- now.set(4000);
+ nowNanos.set(4_000_000_000L);
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
completion.get().complete(success);
assertEquals(success, delayed.get());
@@ -152,14 +158,17 @@ class HttpRequestStrategyTest {
// Some error responses are retried.
HttpResponse serverError = HttpResponse.of(503, null);
- cluster.expect((__, vessel) -> vessel.complete(serverError));
+ cluster.expect((__, vessel) -> {
+ nowNanos.addAndGet(200_000_000L); // Exceed grace period.
+ vessel.complete(serverError);
+ });
assertEquals(serverError, strategy.enqueue(id1, request).get());
assertEquals(11, strategy.stats().requests());
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -168,10 +177,22 @@ class HttpRequestStrategyTest {
assertEquals(badRequest, strategy.enqueue(id1, request).get());
assertEquals(13, strategy.stats().requests());
+
+ // IOException is not retried past timeout.
+ cluster.expect((__, vessel) -> {
+ nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts.
+ vessel.completeExceptionally(new IOException("retry me"));
+ });
+ expected = assertThrows(ExecutionException.class,
+ () -> strategy.enqueue(id1, new HttpRequest("POST", "/", "", null, null, Duration.ofMillis(100), nowNanos::get)).get());
+ assertEquals("retry me", expected.getCause().getCause().getMessage());
+ assertEquals(15, strategy.stats().requests());
+
+
// Circuit breaker opens some time after starting to fail.
- now.set(6000);
+ nowNanos.set(6_000_000_000L);
assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
- now.set(605000);
+ nowNanos.set(605_000_000_000L);
assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
strategy.destroy();
@@ -182,7 +203,7 @@ class HttpRequestStrategyTest {
codes.put(429, 2L);
codes.put(503, 3L);
assertEquals(codes, stats.responsesByCode());
- assertEquals(3, stats.exceptions());
+ assertEquals(5, stats.exceptions());
assertEquals(stats, stats.since(initial));
assertEquals(0, stats.since(stats).averageLatencyMillis());
@@ -191,27 +212,73 @@ class HttpRequestStrategyTest {
}
@Test
- void testShutdown() {
- MockCluster cluster = new MockCluster();
+ void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
+ List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ clusters.iterator()::next);
+
+ // First operation fails, second remains in flight, and third fails.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get();
+ Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
+ clusters.get(0).expect((__, vessel) -> {
+ try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
+ });
+ CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get));
+ CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get();
+
+ // Time advances, and the circuit breaker half-opens.
+ assertEquals(CLOSED, breaker.state());
+ now.addAndGet(2_000_000_000);
+ assertEquals(HALF_OPEN, breaker.state());
+
+ // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ assertFalse(clusters.get(0).closed.get());
+ assertFalse(clusters.get(1).closed.get());
+ secondVessel.complete(HttpResponse.of(504, null));
+ assertEquals(504, secondResponse.get().code());
+ strategy.await();
+ strategy.destroy();
+ assertTrue(clusters.get(0).closed.get());
+ assertTrue(clusters.get(1).closed.get());
+ }
+
+ @Test
+ void testShutdown() throws IOException {
+ MockCluster cluster = new MockCluster();
+ AtomicLong nowNanos = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public int retries() { return 1; }
})
.setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
- cluster);
+ .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
+ () -> cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, null);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, null);
- HttpRequest request = new HttpRequest("POST", "/", null, null, null);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, null);
+ HttpRequest failing = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest partial = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest blocking = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);
@@ -242,7 +309,7 @@ class HttpRequestStrategyTest {
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request);
phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async.
// failed immediately fails, and lets us assert the above retry is indeed enqueued.
- assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal",
+ assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal",
assertThrows(ExecutionException.class, failed::get).getMessage());
phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread.
@@ -269,7 +336,7 @@ class HttpRequestStrategyTest {
assertThrows(ExecutionException.class, blocked::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, delayed::get).getMessage());
- assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed",
+ assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed",
assertThrows(ExecutionException.class, retried::get).getMessage());
assertEquals("ai.vespa.feed.client.FeedException: Operation aborted",
assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage());
@@ -278,6 +345,7 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+ final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -288,6 +356,11 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+
}
}