summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-02 09:54:11 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-02 09:54:11 +0200
commit638f406e5459483cf7cc115e6f0ec6de395a2240 (patch)
treefc58794906b1dc6608eeab170bcad1f3d2764adb /vespa-feed-client/src
parentea94163aa5af3efcd54f28d46552dde5d0bd36e5 (diff)
One request per second when circuit broken—immediate resends when it mends
Diffstat (limited to 'vespa-feed-client/src')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java59
1 files changed, 42 insertions, 17 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 1dd8736cd2b..a202aa92126 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -10,10 +10,9 @@ import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
@@ -40,14 +39,16 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
private final Object monitor = new Object();
private final Clock clock;
private final RetryStrategy wrapped;
- private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
+ private final Thread delayer = new Thread(this::drainDelayed);
+ private final BlockingQueue<CompletableFuture<Void>> delayed = new LinkedBlockingQueue<>();
private final long maxInflight;
private final long minInflight;
private double targetInflight;
private long inflight = 0;
private long consecutiveSuccesses = 0;
- private boolean failed = false;
private Instant lastSuccess;
+ private boolean failed = false;
+ private boolean closed = false;
HttpRequestStrategy(FeedClientBuilder builder, Clock clock) {
this.wrapped = builder.retryStrategy;
@@ -55,7 +56,22 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection);
this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight));
this.clock = clock;
- lastSuccess = clock.instant();
+ this.lastSuccess = clock.instant();
+ this.delayer.start();
+ }
+
+ private void drainDelayed() {
+ try {
+ while (true) {
+ do delayed.take().complete(null);
+ while ( ! hasFailed());
+
+ Thread.sleep(1000);
+ }
+ }
+ catch (InterruptedException e) {
+ delayed.forEach(action -> action.cancel(true));
+ }
}
private boolean retry(SimpleHttpRequest request, int attempt) {
@@ -184,16 +200,17 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
DocumentId documentId, int attempt) {
vessel.whenComplete((response, thrown) -> {
// Retry the operation if it failed with a transient error ...
- if ( ! failed && (thrown != null ? retry(request, thrown, attempt)
- : retry(request, response, attempt))) {
- CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
- if (hasFailed())
- delayer.schedule(() -> dispatch.accept(request, retry), 1, TimeUnit.SECONDS);
- else
- dispatch.accept(request, retry);
- handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1);
- return;
- }
+ if (thrown != null ? retry(request, thrown, attempt)
+ : retry(request, response, attempt)) {
+ CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
+ boolean hasFailed = hasFailed();
+ if (hasFailed)
+ delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry)));
+ else
+ dispatch.accept(request, retry);
+ handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + (hasFailed ? 0 : 1));
+ return;
+ }
// ... or accept the outcome and mark the operation as complete.
CompletableFuture<Void> current;
@@ -214,7 +231,15 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
@Override
public void close() {
- delayer.shutdown();
+ synchronized (monitor) {
+ if (closed)
+ return;
+
+ closed = true;
+ }
+ delayer.interrupt();
+ try { delayer.join(); }
+ catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}