aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 14:06:01 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-09 14:07:43 +0200
commit0caf262b36d58bf39a0ca518507117cb28eabed7 (patch)
treedd2af6678ff79b3af362de851f39bc4be3d4e8b1 /vespa-feed-client
parent8f3f58da7157d9fca170f9370c9fa936100501c3 (diff)
Use a dedicated thread for result handling for feed client (parsing etc.)
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java41
1 files changed, 25 insertions, 16 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index a805c7eb195..e11358e4bf4 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -12,6 +12,8 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@@ -51,6 +53,11 @@ class HttpRequestStrategy implements RequestStrategy {
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final AtomicLong delayedCount = new AtomicLong(0);
private final AtomicLong retries = new AtomicLong(0);
+ private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable, "feed-client-result-executor");
+ thread.setDaemon(true);
+ return thread;
+ });
HttpRequestStrategy(FeedClientBuilder builder) throws IOException {
this(builder, new HttpCluster(builder));
@@ -222,22 +229,23 @@ class HttpRequestStrategy implements RequestStrategy {
/** Handles the result of one attempt at the given operation, retrying if necessary. */
private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) {
- vessel.whenComplete((response, thrown) -> {
- // Retry the operation if it failed with a transient error ...
- if (thrown != null ? retry(request, thrown, attempt)
- : retry(request, response, attempt)) {
- retries.incrementAndGet();
- CircuitBreaker.State state = breaker.state();
- CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
- offer(() -> cluster.dispatch(request, retry));
- handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
- }
- // ... or accept the outcome and mark the operation as complete.
- else {
- if (thrown == null) result.complete(response);
- else result.completeExceptionally(thrown);
- }
- });
+ vessel.whenCompleteAsync((response, thrown) -> {
+ // Retry the operation if it failed with a transient error ...
+ if (thrown != null ? retry(request, thrown, attempt)
+ : retry(request, response, attempt)) {
+ retries.incrementAndGet();
+ CircuitBreaker.State state = breaker.state();
+ CompletableFuture<SimpleHttpResponse> retry = new CompletableFuture<>();
+ offer(() -> cluster.dispatch(request, retry));
+ handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
+ }
+ // ... or accept the outcome and mark the operation as complete.
+ else {
+ if (thrown == null) result.complete(response);
+ else result.completeExceptionally(thrown);
+ }
+ },
+ resultExecutor);
}
@Override
@@ -246,6 +254,7 @@ class HttpRequestStrategy implements RequestStrategy {
inflightById.values().forEach(result -> result.cancel(true));
cluster.close();
+ resultExecutor.shutdown();
}
}