aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2023-01-06 15:39:58 +0100
committerGitHub <noreply@github.com>2023-01-06 15:39:58 +0100
commit58889b4e6d3f220c1c52907f37a57fc5c4e53060 (patch)
treec8bd5472b95d572696c0281dfb8810d66b7a4166 /vespa-feed-client
parentb15d7cfa3132b2be5967510f6951ed04fb587361 (diff)
parenteca0f74defc573cd837299d71750d4821f73a466 (diff)
Merge pull request #25432 from vespa-engine/jonmv/thread-pool-for-http-dispatch
Use a thread pool for HTTP dispatch, especially faster with compression
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java65
1 files changed, 35 insertions, 30 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
index 1dda8912046..b2672b4ebf3 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/ApacheCluster.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -51,7 +52,8 @@ class ApacheCluster implements Cluster {
private final boolean gzip;
private int someNumber = 0;
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread"));
+ private final ExecutorService dispatchExecutor = Executors.newFixedThreadPool(8, t -> new Thread(t, "request-dispatch-thread"));
+ private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(t -> new Thread(t, "request-timeout-thread"));
ApacheCluster(FeedClientBuilderImpl builder) throws IOException {
for (int i = 0; i < builder.connectionsPerEndpoint; i++)
@@ -77,36 +79,38 @@ class ApacheCluster implements Cluster {
Endpoint endpoint = leastBusy;
endpoint.inflight.incrementAndGet();
- try {
- SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
- request.setScheme(endpoint.url.getScheme());
- request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url)));
- request.setConfig(requestConfig);
- defaultHeaders.forEach(request::setHeader);
- wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
- if (wrapped.body() != null) {
- byte[] body = wrapped.body();
- if (gzip) {
- request.setHeader(gzipEncodingHeader);
- body = gzipped(body);
+ dispatchExecutor.execute(() -> {
+ try {
+ SimpleHttpRequest request = new SimpleHttpRequest(wrapped.method(), wrapped.path());
+ request.setScheme(endpoint.url.getScheme());
+ request.setAuthority(new URIAuthority(endpoint.url.getHost(), portOf(endpoint.url)));
+ request.setConfig(requestConfig);
+ defaultHeaders.forEach(request::setHeader);
+ wrapped.headers().forEach((name, value) -> request.setHeader(name, value.get()));
+ if (wrapped.body() != null) {
+ byte[] body = wrapped.body();
+ if (gzip) {
+ request.setHeader(gzipEncodingHeader);
+ body = gzipped(body);
+ }
+ request.setBody(body, ContentType.APPLICATION_JSON);
}
- request.setBody(body, ContentType.APPLICATION_JSON);
- }
- Future<?> future = endpoint.client.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
- @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
- @Override public void cancelled() { vessel.cancel(false); }
- });
- long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000;
- Future<?> cancellation = executor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS);
- vessel.whenComplete((__, ___) -> cancellation.cancel(true));
- }
- catch (Throwable thrown) {
- vessel.completeExceptionally(thrown);
- }
- vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ Future<?> future = endpoint.client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { vessel.complete(new ApacheHttpResponse(response)); }
+ @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); }
+ @Override public void cancelled() { vessel.cancel(false); }
+ });
+ long timeoutMillis = wrapped.timeout() == null ? 200_000 : wrapped.timeout().toMillis() * 11 / 10 + 1_000;
+ Future<?> cancellation = timeoutExecutor.schedule(() -> { future.cancel(true); vessel.cancel(true); }, timeoutMillis, TimeUnit.MILLISECONDS);
+ vessel.whenComplete((__, ___) -> cancellation.cancel(true));
+ }
+ catch (Throwable thrown) {
+ vessel.completeExceptionally(thrown);
+ }
+ vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet());
+ });
}
private byte[] gzipped(byte[] content) throws IOException{
@@ -120,6 +124,7 @@ class ApacheCluster implements Cluster {
@Override
public void close() {
Throwable thrown = null;
+ dispatchExecutor.shutdownNow().forEach(Runnable::run);
for (Endpoint endpoint : endpoints) {
try {
endpoint.client.close();
@@ -129,7 +134,7 @@ class ApacheCluster implements Cluster {
else thrown.addSuppressed(t);
}
}
- executor.shutdownNow().forEach(Runnable::run);
+ timeoutExecutor.shutdownNow().forEach(Runnable::run);
if (thrown != null) throw new RuntimeException(thrown);
}