diff options
author | Bjørn Christian Seime <bjorncs@yahooinc.com> | 2023-07-07 14:19:10 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-07 14:19:10 +0200 |
commit | dd0854a032a6d196e5d6329a3e444bf060173c4d (patch) | |
tree | 1ce3ba8d7828e4edb49ecc36df6b081b5b605e21 /vespa-feed-client | |
parent | 5396220b2ac64426188c2a9423a0176cc7d56c23 (diff) | |
parent | f6ae50a419ea4deabbede6e99bb9a5e4e160e4eb (diff) |
Merge pull request #27705 from vespa-engine/bjorncs/vespa-feed-client
Create and dispatch request in client's threadpool
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java | 46 |
1 files changed, 26 insertions, 20 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 9f5523b062c..4e1258caa18 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -67,25 +67,31 @@ class JettyCluster implements Cluster { @Override public void dispatch(HttpRequest req, CompletableFuture<HttpResponse> vessel) { - Endpoint endpoint = findLeastBusyEndpoint(endpoints); - long reqTimeoutMillis = req.timeout() != null - ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); - Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) - .version(HttpVersion.HTTP_2) - .method(HttpMethod.fromString(req.method())) - .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) - .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) - .timeout(reqTimeoutMillis, MILLISECONDS); - if (req.body() != null) { - FeedContent content = new FeedContent(compression, req.body()); - content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce))); - jettyReq.body(content); - } - jettyReq.send(new BufferingResponseListener() { - @Override - public void onComplete(Result result) { - if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); - else vessel.complete(new JettyResponse(result.getResponse(), getContent())); + client.getExecutor().execute(() -> { + try { + Endpoint endpoint = findLeastBusyEndpoint(endpoints); + long reqTimeoutMillis = req.timeout() != null + ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); + Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) + .version(HttpVersion.HTTP_2) + .method(HttpMethod.fromString(req.method())) + .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) + .idleTimeout(IDLE_TIMEOUT.toMillis(), MILLISECONDS) + .timeout(reqTimeoutMillis, MILLISECONDS); + if (req.body() != null) { + FeedContent content = new FeedContent(compression, req.body()); + content.contentEncoding().ifPresent(ce -> jettyReq.headers(hs -> hs.add(ce))); + jettyReq.body(content); + } + jettyReq.send(new BufferingResponseListener() { + @Override + public void onComplete(Result result) { + if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); + else vessel.complete(new JettyResponse(result.getResponse(), getContent())); + } + }); + } catch (Exception e) { + vessel.completeExceptionally(e); } }); } @@ -106,7 +112,7 @@ class JettyCluster implements Cluster { clientSslCtxFactory.setEndpointIdentificationAlgorithm(null); } ClientConnector connector = new ClientConnector(); - int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 16), 4); + int threads = Math.max(Math.min(Runtime.getRuntime().availableProcessors(), 20), 8); connector.setExecutor(new QueuedThreadPool(threads)); connector.setSslContextFactory(clientSslCtxFactory); HTTP2Client h2Client = new HTTP2Client(connector); |