aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-06 18:44:33 +0200
committerBjørn Christian Seime <bjorncs@yahooinc.com>2023-07-07 13:27:30 +0200
commitf6ae50a419ea4deabbede6e99bb9a5e4e160e4eb (patch)
tree0e8593aae4d1bcc2da23efa9966f4f4e9bb1599c /vespa-feed-client
parentf44c0fe6af06a966a655c1efd50bc2873c7b0e5f (diff)
Create and dispatch request in client's threadpool
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java46
1 files changed, 26 insertions, 20 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index 9f5523b062c..4e1258caa18 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -67,25 +67,31 @@ class JettyCluster implements Cluster {
@Override
public void dispatch(HttpRequest req, CompletableFuture<HttpResponse> vessel) {
- Endpoint endpoint = findLeastBusyEndpoint(endpoints);
- long reqTimeoutMillis = req.timeout() != null
- ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
- Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
- .version(HttpVersion.HTTP_2)
- .method(HttpMethod.fromString(req.method()))
- .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get())))
- .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS)
- .timeout(reqTimeoutMillis, MILLISECONDS);
- if (req.body() != null) {
- FeedContent content = new FeedContent(compression, req.body());
- content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce)));
- jettyReq.body(content);
- }
- jettyReq.send(new BufferingResponseListener() {
- @Override
- public void onComplete(Result result) {
- if (result.isFailed()) vessel.completeExceptionally(result.getFailure());
- else vessel.complete(new JettyResponse(result.getResponse(), getContent()));
+ client.getExecutor().execute(() -> {
+ try {
+ Endpoint endpoint = findLeastBusyEndpoint(endpoints);
+ long reqTimeoutMillis = req.timeout() != null
+ ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis();
+ Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
+ .version(HttpVersion.HTTP_2)
+ .method(HttpMethod.fromString(req.method()))
+ .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get())))
+ .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS)
+ .timeout(reqTimeoutMillis, MILLISECONDS);
+ if (req.body() != null) {
+ FeedContent content = new FeedContent(compression, req.body());
+ content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce)));
+ jettyReq.body(content);
+ }
+ jettyReq.send(new BufferingResponseListener() {
+ @Override
+ public void onComplete(Result result) {
+ if (result.isFailed()) vessel.completeExceptionally(result.getFailure());
+ else vessel.complete(new JettyResponse(result.getResponse(), getContent()));
+ }
+ });
+ } catch (Exception e) {
+ vessel.completeExceptionally(e);
}
});
}
@@ -106,7 +112,7 @@ class JettyCluster implements Cluster {
clientSslCtxFactory.setEndpointIdentificationAlgorithm(null);
}
ClientConnector connector = new ClientConnector();
- int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 16), 4);
+ int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 20), 8);
connector.setExecutor(new QueuedThreadPool(threads));
connector.setSslContextFactory(clientSslCtxFactory);
HTTP2Client h2Client = new HTTP2Client(connector);