diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-02 09:54:11 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-02 09:54:11 +0200 |
commit | 638f406e5459483cf7cc115e6f0ec6de395a2240 (patch) | |
tree | fc58794906b1dc6608eeab170bcad1f3d2764adb /vespa-feed-client/src | |
parent | ea94163aa5af3efcd54f28d46552dde5d0bd36e5 (diff) |
One request per second when circuit broken—immediate resends when it mends
Diffstat (limited to 'vespa-feed-client/src')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 59 |
1 files changed, 42 insertions, 17 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 1dd8736cd2b..a202aa92126 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -10,10 +10,9 @@ import java.time.Clock; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiConsumer; import java.util.logging.Logger; @@ -40,14 +39,16 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { private final Object monitor = new Object(); private final Clock clock; private final RetryStrategy wrapped; - private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1); + private final Thread delayer = new Thread(this::drainDelayed); + private final BlockingQueue<CompletableFuture<Void>> delayed = new LinkedBlockingQueue<>(); private final long maxInflight; private final long minInflight; private double targetInflight; private long inflight = 0; private long consecutiveSuccesses = 0; - private boolean failed = false; private Instant lastSuccess; + private boolean failed = false; + private boolean closed = false; HttpRequestStrategy(FeedClientBuilder builder, Clock clock) { this.wrapped = builder.retryStrategy; @@ -55,7 +56,22 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection); this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); this.clock = clock; - lastSuccess = clock.instant(); + this.lastSuccess = clock.instant(); + this.delayer.start(); + } + + private void drainDelayed() { + try { + while (true) { + do delayed.take().complete(null); + while ( ! hasFailed()); + + Thread.sleep(1000); + } + } + catch (InterruptedException e) { + delayed.forEach(action -> action.cancel(true)); + } } private boolean retry(SimpleHttpRequest request, int attempt) { @@ -184,16 +200,17 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { DocumentId documentId, int attempt) { vessel.whenComplete((response, thrown) -> { // Retry the operation if it failed with a transient error ... - if ( ! failed && (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt))) { - CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); - if (hasFailed()) - delayer.schedule(() -> dispatch.accept(request, retry), 1, TimeUnit.SECONDS); - else - dispatch.accept(request, retry); - handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1); - return; - } + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { + CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); + boolean hasFailed = hasFailed(); + if (hasFailed) + delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry))); + else + dispatch.accept(request, retry); + handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + (hasFailed ? 0 : 1)); + return; + } // ... or accept the outcome and mark the operation as complete. CompletableFuture<Void> current; @@ -214,7 +231,15 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { @Override public void close() { - delayer.shutdown(); + synchronized (monitor) { + if (closed) + return; + + closed = true; + } + delayer.interrupt(); + try { delayer.join(); } + catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } |