diff options
author | jonmv <venstad@gmail.com> | 2023-01-06 14:05:37 +0100 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-01-06 14:05:37 +0100 |
commit | eca0f74defc573cd837299d71750d4821f73a466 (patch) | |
tree | 1e15abf362a68a29bc7c29707706a80903a9b806 /vespa-feed-client | |
parent | 7d839355259eca823da9396c1ed15b43f7c98768 (diff) |
Use a thread pool for HTTP dispatch, especially faster with compression
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java | 65 |
1 files changed, 35 insertions, 30 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java index 1dda8912046..b2672b4ebf3 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -51,7 +52,8 @@ class ApacheCluster implements Cluster { private final boolean gzip; private int someNumber = 0; - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread")); + private final ExecutorService dispatchExecutor = Executors.newFixedThreadPool(8, t -> new Thread(t, "request-dispatch-thread")); + private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread")); ApacheCluster(FeedClientBuilderImpl builder) throws IOException { for (int i = 0; i < builder.connectionsPerEndpoint; i++) @@ -77,36 +79,38 @@ class ApacheCluster implements Cluster { Endpoint endpoint = leastBusy; endpoint.inflight.incrementAndGet(); - try { - SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); - request.setScheme(endpoint.url.getScheme()); - request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url))); - request.setConfig(requestConfig); - defaultHeaders.forEach(request::setHeader); - wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); - if (wrapped.body() != null) { - byte[] body = wrapped.body(); - if (gzip) { - request.setHeader(gzipEncodingHeader); - body = gzipped(body); + dispatchExecutor.execute(() -> { + try { + SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path()); + request.setScheme(endpoint.url.getScheme()); + request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url))); + request.setConfig(requestConfig); + defaultHeaders.forEach(request::setHeader); + wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get())); + if (wrapped.body() != null) { + byte[] body = wrapped.body(); + if (gzip) { + request.setHeader(gzipEncodingHeader); + body = gzipped(body); + } + request.setBody(body, ContentType.APPLICATION_JSON); } - request.setBody(body, ContentType.APPLICATION_JSON); - } - Future<?> future = endpoint.client.execute(request, - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); - long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000; - Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS); - vessel.whenComplete((__, ___) -> cancellation.cancel(true)); - } - catch (Throwable thrown) { - vessel.completeExceptionally(thrown); - } - vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + Future<?> future = endpoint.client.execute(request, + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); + long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000; + Future<?> cancellation = timeoutExecutor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS); + vessel.whenComplete((__, ___) -> cancellation.cancel(true)); + } + catch (Throwable thrown) { + vessel.completeExceptionally(thrown); + } + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); + }); } private byte[] gzipped(byte[] content) throws IOException{ @@ -120,6 +124,7 @@ class ApacheCluster implements Cluster { @Override public void close() { Throwable thrown = null; + dispatchExecutor.shutdownNow().forEach(Runnable::run); for (Endpoint endpoint : endpoints) { try { endpoint.client.close(); @@ -129,7 +134,7 @@ class ApacheCluster implements Cluster { else thrown.addSuppressed(t); } } - executor.shutdownNow().forEach(Runnable::run); + timeoutExecutor.shutdownNow().forEach(Runnable::run); if (thrown != null) throw new RuntimeException(thrown); } |