From 0caf262b36d58bf39a0ca518507117cb28eabed7 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Wed, 9 Jun 2021 14:06:01 +0200 Subject: Use a dedicated thread for result handling for feed client (parsing etc.) --- .../ai/vespa/feed/client/HttpRequestStrategy.java | 41 +++++++++++++--------- 1 file changed, 25 insertions(+), 16 deletions(-) (limited to 'vespa-feed-client') diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index a805c7eb195..e11358e4bf4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -12,6 +12,8 @@ import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; @@ -51,6 +53,11 @@ class HttpRequestStrategy implements RequestStrategy { private final AtomicBoolean destroyed = new AtomicBoolean(false); private final AtomicLong delayedCount = new AtomicLong(0); private final AtomicLong retries = new AtomicLong(0); + private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "feed-client-result-executor"); + thread.setDaemon(true); + return thread; + }); HttpRequestStrategy(FeedClientBuilder builder) throws IOException { this(builder, new HttpCluster(builder)); @@ -222,22 +229,23 @@ class HttpRequestStrategy implements RequestStrategy { /** Handles the result of one attempt at the given operation, retrying if necessary. */ private void handleAttempt(CompletableFuture vessel, SimpleHttpRequest request, CompletableFuture result, int attempt) { - vessel.whenComplete((response, thrown) -> { - // Retry the operation if it failed with a transient error ... - if (thrown != null ? retry(request, thrown, attempt) - : retry(request, response, attempt)) { - retries.incrementAndGet(); - CircuitBreaker.State state = breaker.state(); - CompletableFuture retry = new CompletableFuture<>(); - offer(() -> cluster.dispatch(request, retry)); - handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); - } - // ... or accept the outcome and mark the operation as complete. - else { - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); - } - }); + vessel.whenCompleteAsync((response, thrown) -> { + // Retry the operation if it failed with a transient error ... + if (thrown != null ? retry(request, thrown, attempt) + : retry(request, response, attempt)) { + retries.incrementAndGet(); + CircuitBreaker.State state = breaker.state(); + CompletableFuture retry = new CompletableFuture<>(); + offer(() -> cluster.dispatch(request, retry)); + handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); + } + // ... or accept the outcome and mark the operation as complete. + else { + if (thrown == null) result.complete(response); + else result.completeExceptionally(thrown); + } + }, + resultExecutor); } @Override @@ -246,6 +254,7 @@ class HttpRequestStrategy implements RequestStrategy { inflightById.values().forEach(result -> result.cancel(true)); cluster.close(); + resultExecutor.shutdown(); } } -- cgit v1.2.3