summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-01 21:53:32 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-01 21:53:32 +0200
commitf681e9933cc8d9c3e0470e0eede56eb9ffdee296 (patch)
treefe5a52ffc6de7ce7f6c7b51fb60a143fc8bda132 /vespa-feed-client
parent15d754c4e19ed171d3989904095fd9554849eb43 (diff)
Fail based on time since last success instead
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java6
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java59
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java4
3 files changed, 32 insertions, 37 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 370cd326f10..b3c711f7cc5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -45,7 +45,7 @@ class HttpFeedClient implements FeedClient {
private final URI endpoint;
private final Map<String, Supplier<String>> requestHeaders;
- private final HttpRequestStrategy requestStrategy;
+ private final RequestStrategy requestStrategy;
private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>();
private final List<AtomicInteger> inflight = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -56,14 +56,14 @@ class HttpFeedClient implements FeedClient {
this.requestStrategy = new HttpRequestStrategy(builder);
for (int i = 0; i < builder.maxConnections; i++) {
- CloseableHttpAsyncClient client = createHttpClient(builder, requestStrategy);
+ CloseableHttpAsyncClient client = createHttpClient(builder);
client.start();
httpClients.add(client);
inflight.add(new AtomicInteger());
}
}
- private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder, HttpRequestStrategy retryStrategy) throws IOException {
+ private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException {
H2AsyncClientBuilder httpClientBuilder = H2AsyncClientBuilder.create()
.setUserAgent(String.format("vespa-feed-client/%s", Vespa.VERSION))
.setDefaultHeaders(Collections.singletonList(new BasicHeader("Vespa-Client-Version", Vespa.VERSION)))
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 23569af3cdc..17edfeafba6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -6,15 +6,14 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.IOException;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
import java.util.logging.Logger;
import static java.lang.Math.max;
-import static java.lang.Math.min;
import static java.util.logging.Level.INFO;
/**
@@ -30,8 +29,7 @@ import static java.util.logging.Level.INFO;
*/
class HttpRequestStrategy implements RequestStrategy {
- private static Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
- private static final double errorThreshold = 0.1;
+ private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName());
private final Map<DocumentId, CompletableFuture<Void>> inflightById = new HashMap<>();
private final Object lock = new Object();
@@ -40,9 +38,9 @@ class HttpRequestStrategy implements RequestStrategy {
private final long minInflight;
private double targetInflight;
private long inflight = 0;
- private double errorRate = 0;
private long consecutiveSuccesses = 0;
private boolean failed = false;
+ private Instant lastSuccess = Instant.MAX;
HttpRequestStrategy(FeedClientBuilder builder) {
this.wrapped = builder.retryStrategy;
@@ -51,53 +49,47 @@ class HttpRequestStrategy implements RequestStrategy {
this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
}
+ private boolean retry(SimpleHttpRequest request, int attempt) {
+ if (attempt >= wrapped.retries())
+ return false;
+
+ switch (request.getMethod().toUpperCase()) {
+
+ case "POST": return wrapped.retry(FeedClient.OperationType.put);
+ case "PUT": return wrapped.retry(FeedClient.OperationType.update);
+ case "DELETE": return wrapped.retry(FeedClient.OperationType.remove);
+ default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
+ }
+ }
+
/**
* Retries all IOExceptions, unless error rate has converged to a value higher than the threshold,
* or the user has turned off retries for this type of operation.
*/
private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) {
failure();
- log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request +
- ", error rate is " + errorRate + ", " + consecutiveSuccesses + " successes since last error");
+ log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error");
if ( ! (thrown instanceof IOException))
return false;
- if (attempt > wrapped.retries())
- return false;
-
-
- switch (request.getMethod().toUpperCase()) {
- case "POST": return wrapped.retry(FeedClient.OperationType.put);
- case "PUT": return wrapped.retry(FeedClient.OperationType.update);
- case "DELETE": return wrapped.retry(FeedClient.OperationType.remove);
- default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod());
- }
+ return retry(request, attempt);
}
- /**
- * Called when a response is successfully obtained. In conjunction with IOException reports, this makes the
- * error rate converge towards the true error rate, at a speed inversely proportional to the target number
- * of inflight requests, per reported success/error, i.e., hopefully at a rate independent of transport width.
- */
void success() {
+ Instant now = Instant.now();
synchronized (lock) {
- errorRate -= errorRate / targetInflight; // Converges towards true error rate, in conjunction with failure updates.
- targetInflight = min(targetInflight + 0.1, maxInflight);
++consecutiveSuccesses;
+ lastSuccess = now;
}
}
- /**
- * Called whenever a failure to get a successful response is recorded.
- */
void failure() {
+ Instant threshold = Instant.now().minusSeconds(300);
synchronized (lock) {
- errorRate += (1 - errorRate) / targetInflight; // Converges towards true error rate, in conjunction with success updates.
- if (errorRate > errorThreshold)
- failed = true;
-
consecutiveSuccesses = 0;
+ if (lastSuccess.isBefore(threshold))
+ failed = true;
}
}
@@ -119,7 +111,10 @@ class HttpRequestStrategy implements RequestStrategy {
}
failure();
- return attempt <= wrapped.retries() && (response.getCode() == 500 || response.getCode() == 502); // Hopefully temporary errors.
+ if (response.getCode() != 500 && response.getCode() != 502)
+ return false;
+
+ return retry(request, attempt); // Hopefully temporary errors.
}
// Must hold lock.
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
index 466c4f9a0ab..1787d8d65c6 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java
@@ -14,10 +14,10 @@ import java.util.function.BiConsumer;
*/
public interface RequestStrategy {
- /** Whether this has failed, and we should stop. */
+ /** Whether this has failed fatally, and we should cease sending further operations. */
boolean hasFailed();
- /** Enqueue the given operation, which is dispatched to a vessel future when ready. */
+ /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */
CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, SimpleHttpRequest request,
BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch);