From 638f406e5459483cf7cc115e6f0ec6de395a2240 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Wed, 2 Jun 2021 09:54:11 +0200 Subject: One request per second when circuit broken—immediate resends when it mends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ai/vespa/feed/client/HttpRequestStrategy.java | 59 +++++++++++++++------- 1 file changed, 42 insertions(+), 17 deletions(-) (limited to 'vespa-feed-client/src') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 1dd8736cd2b..a202aa92126 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -10,10 +10,9 @@ import java.time.Clock; import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiConsumer; import java.util.logging.Logger; @@ -40,14 +39,16 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { private final Object monitor = new Object(); private final Clock clock; private final RetryStrategy wrapped; - private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1); + private final Thread delayer = new Thread(this::drainDelayed); + private final BlockingQueue> delayed = new LinkedBlockingQueue<>(); private final long maxInflight; private final long minInflight; private double targetInflight; private long inflight = 0; private long consecutiveSuccesses = 0; - private boolean failed = false; private Instant lastSuccess; + private boolean failed = false; + private boolean closed = false; HttpRequestStrategy(FeedClientBuilder builder, Clock clock) { this.wrapped = builder.retryStrategy; @@ -55,7 +56,22 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection); this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); this.clock = clock; - lastSuccess = clock.instant(); + this.lastSuccess = clock.instant(); + this.delayer.start(); + } + + private void drainDelayed() { + try { + while (true) { + do delayed.take().complete(null); + while ( ! hasFailed()); + + Thread.sleep(1000); + } + } + catch (InterruptedException e) { + delayed.forEach(action -> action.cancel(true)); + } } private boolean retry(SimpleHttpRequest request, int attempt) { @@ -184,16 +200,17 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { DocumentId documentId, int attempt) { vessel.whenComplete((response, thrown) -> { // Retry the operation if it failed with a transient error ... - if ( ! failed && (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt))) { - CompletableFuture retry = new CompletableFuture<>(); - if (hasFailed()) - delayer.schedule(() -> dispatch.accept(request, retry), 1, TimeUnit.SECONDS); - else - dispatch.accept(request, retry); - handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + 1); - return; - } + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { + CompletableFuture retry = new CompletableFuture<>(); + boolean hasFailed = hasFailed(); + if (hasFailed) + delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry))); + else + dispatch.accept(request, retry); + handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + (hasFailed ? 0 : 1)); + return; + } // ... or accept the outcome and mark the operation as complete. CompletableFuture current; @@ -214,7 +231,15 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { @Override public void close() { - delayer.shutdown(); + synchronized (monitor) { + if (closed) + return; + + closed = true; + } + delayer.interrupt(); + try { delayer.join(); } + catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } -- cgit v1.2.3