diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 14:06:01 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 14:07:43 +0200 |
commit | 0caf262b36d58bf39a0ca518507117cb28eabed7 (patch) | |
tree | dd2af6678ff79b3af362de851f39bc4be3d4e8b1 /vespa-feed-client | |
parent | 8f3f58da7157d9fca170f9370c9fa936100501c3 (diff) |
Use a dedicated thread for result handling for feed client (parsing etc.)
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 41 |
1 files changed, 25 insertions, 16 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index a805c7eb195..e11358e4bf4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -12,6 +12,8 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -51,6 +53,11 @@ class HttpRequestStrategy implements RequestStrategy { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final AtomicLong delayedCount = new AtomicLong(0); private final AtomicLong retries = new AtomicLong(0); + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "feed-client-result-executor"); + thread.setDaemon(true); + return thread; + }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { this(builder, new HttpCluster(builder)); @@ -222,22 +229,23 @@ class HttpRequestStrategy implements RequestStrategy { /** Handles the result of one attempt at the given operation, retrying if necessary. */ private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) { - vessel.whenComplete((response, thrown) -> { - // Retry the operation if it failed with a transient error ... - if (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt)) { - retries.incrementAndGet(); - CircuitBreaker.State state = breaker.state(); - CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); - offer(() -> cluster.dispatch(request, retry)); - handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); - } - // ... or accept the outcome and mark the operation as complete. - else { - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - } - }); + vessel.whenCompleteAsync((response, thrown) -> { + // Retry the operation if it failed with a transient error ... + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { + retries.incrementAndGet(); + CircuitBreaker.State state = breaker.state(); + CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>(); + offer(() -> cluster.dispatch(request, retry)); + handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + } + // ... or accept the outcome and mark the operation as complete. + else { + if (thrown == null) result.complete(response); + else result.completeExceptionally(thrown); + } + }, + resultExecutor); } @Override @@ -246,6 +254,7 @@ class HttpRequestStrategy implements RequestStrategy { inflightById.values().forEach(result -> result.cancel(true)); cluster.close(); + resultExecutor.shutdown(); } } |