summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-06-01 22:03:26 +0200
committerGitHub <noreply@github.com>2021-06-01 22:03:26 +0200
commit471e0c66a6649175883352bf27f833a7686527d8 (patch)
tree589dbc7c126641e2e9f8a9b95aa0d55c78c98e93
parent355f65ebe19082a14778ffd726fc654199894106 (diff)
parent951b86bbe849fdf81aa25a917d4157c9cf52d173 (diff)
Merge pull request #18079 from vespa-engine/jonmv/vespa-feed-client
Fail based on time since last success instead
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java2
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java60
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java4
4 files changed, 34 insertions, 38 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index 1b616a70da9..455a79060ee 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -20,7 +20,7 @@ public interface FeedClient extends Closeable {
default boolean retry(OperationType type) { return true; }
/** Number of retries per operation for non-backpressure problems. */
- default int retries() { return 5; }
+ default int retries() { return 32; }
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 370cd326f10..b3c711f7cc5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -45,7 +45,7 @@ class HttpFeedClient implements FeedClient {
private final URI endpoint;
private final Map<String, Supplier<String>> requestHeaders;
- private final HttpRequestStrategy requestStrategy;
+ private final RequestStrategy requestStrategy;
private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>();
private final List<AtomicInteger> inflight = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -56,14 +56,14 @@ class HttpFeedClient implements FeedClient {
this.requestStrategy = new HttpRequestStrategy(builder);
for (int i = 0; i < builder.maxConnections; i++) {
- CloseableHttpAsyncClient client = createHttpClient(builder, requestStrategy);
+ CloseableHttpAsyncClient client = createHttpClient(builder);
client.start();
httpClients.add(client);
inflight.add(new AtomicInteger());
}
}
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder, HttpRequestStrategy retryStrategy) throws IOException {
+ private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
.setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
.setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 23569af3cdc..d43edc9656b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -6,11 +6,11 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
import java.util.logging.Logger;
import static java.lang.Math.max;
@@ -30,8 +30,7 @@ import static java.util.logging.Level.INFO;
*/
class HttpRequestStrategy implements RequestStrategy {
- private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
- private static final double errorThreshold = 0.1;
+ private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
private final Object lock = new Object();
@@ -40,33 +39,21 @@ class HttpRequestStrategy implements RequestStrategy {
private final long minInflight;
private double targetInflight;
private long inflight = 0;
- private double errorRate = 0;
private long consecutiveSuccesses = 0;
private boolean failed = false;
+ private Instant lastSuccess = Instant.MAX;
HttpRequestStrategy(FeedClientBuilder builder) {
this.wrapped = builder.retryStrategy;
this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
- this.minInflight = builder.maxConnections * (long) Math.min(16, builder.maxStreamsPerConnection);
+ this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection);
this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
}
- /**
- * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
- * or the user has turned off retries for this type of operation.
- */
- private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
- failure();
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request +
- ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error");
-
- if ( ! (thrown instanceof IOException))
+ private boolean retry(SimpleHttpRequest request, int attempt) {
+ if (attempt >= wrapped.retries())
return false;
- if (attempt > wrapped.retries())
- return false;
-
-
switch (request.getMethod().toUpperCase()) {
case "POST": return wrapped.retry(FeedClient.OperationType.put);
case "PUT": return wrapped.retry(FeedClient.OperationType.update);
@@ -76,28 +63,34 @@ class HttpRequestStrategy implements RequestStrategy {
}
/**
- * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the
- * error rate converge towards the true error rate, at a speed inversely proportional to the target number
- * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width.
+ * Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
+ * or the user has turned off retries for this type of operation.
*/
+ private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
+ failure();
+ log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error");
+
+ if ( ! (thrown instanceof IOException))
+ return false;
+
+ return retry(request, attempt);
+ }
+
void success() {
+ Instant now = Instant.now();
synchronized (lock) {
- errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates.
- targetInflight = min(targetInflight + 0.1, maxInflight);
++consecutiveSuccesses;
+ lastSuccess = now;
+ targetInflight = min(targetInflight + 0.1, maxInflight);
}
}
- /**
- * Called whenever a failure to get a successful response is recorded.
- */
void failure() {
+ Instant threshold = Instant.now().minusSeconds(300);
synchronized (lock) {
- errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates.
- if (errorRate > errorThreshold)
- failed = true;
-
consecutiveSuccesses = 0;
+ if (lastSuccess.isBefore(threshold))
+ failed = true;
}
}
@@ -119,7 +112,10 @@ class HttpRequestStrategy implements RequestStrategy {
}
failure();
- return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors.
+ if (response.getCode() != 500 && response.getCode() != 502)
+ return false;
+
+ return retry(request, attempt); // Hopefully temporary errors.
}
// Must hold lock.
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index 466c4f9a0ab..1787d8d65c6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -14,10 +14,10 @@ import java.util.function.BiConsumer;
*/
public interface RequestStrategy {
- /** Whether this has failed, and we should stop. */
+ /** Whether this has failed fatally, and we should cease sending further operations. */
boolean hasFailed();
- /** Enqueue the given operation, which is dispatched to a vessel future when ready. */
+ /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch);