diff options
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java | 28 |
1 files changed, 20 insertions, 8 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 42079718115..dc902297d6d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -335,6 +336,7 @@ class HttpRequestStrategy implements RequestStrategy { private final Object monitor = new Object(); private final ClusterFactory clusterFactory; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); private AtomicLong inflight = new AtomicLong(0); private Cluster delegate; @@ -350,14 +352,15 @@ class HttpRequestStrategy implements RequestStrategy { usedCounter.incrementAndGet(); Cluster usedCluster = delegate; usedCluster.dispatch(request, vessel); - vessel.whenComplete((__, ___) -> { - synchronized (monitor) { - if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) { - log.log(INFO, "Closing old HTTP client"); - usedCluster.close(); - } - } - }); + vessel.whenCompleteAsync((__, ___) -> { + synchronized (monitor) { + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) { + log.log(INFO, "Closing old HTTP client"); + usedCluster.close(); + } + } + }, + executor); } } @@ -365,6 +368,15 @@ class HttpRequestStrategy implements RequestStrategy { public void close() { synchronized (monitor) { delegate.close(); + executor.shutdown(); + try { + if ( ! executor.awaitTermination(1, TimeUnit.MINUTES)) + log.log(WARNING, "Failed shutting down HTTP client within 1 minute"); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for HTTP client to shut down"); + Thread.currentThread().interrupt(); + } } } |