From 71da359825e57313ccc949582590258588bcfe38 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 27 May 2021 12:37:55 +0200 Subject: Load balance over multiple clients/connections --- .../java/ai/vespa/feed/client/HttpFeedClient.java | 41 ++++++++++++++++------ 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 1450d306733..e79f8dac67e 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -43,16 +44,21 @@ class HttpFeedClient implements FeedClient { private final URI endpoint; private final Map> requestHeaders; private final HttpRequestStrategy requestStrategy; - private final CloseableHttpAsyncClient httpClient; + private final List httpClients = new ArrayList<>(); + private final List inflight = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(); HttpFeedClient(FeedClientBuilder builder) throws IOException { this.endpoint = builder.endpoint; this.requestHeaders = new HashMap<>(builder.requestHeaders); - this.requestStrategy = new HttpRequestStrategy(builder); - this.httpClient = createHttpClient(builder, requestStrategy); - this.httpClient.start(); + + for (int i = 0; i < builder.maxConnections; i++) { + CloseableHttpAsyncClient hc = createHttpClient(builder, requestStrategy); + hc.start(); + httpClients.add(hc); + inflight.add(new AtomicInteger()); + } } private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder, HttpRequestStrategy retryStrategy) throws IOException { @@ -116,7 +122,8 @@ class HttpFeedClient implements FeedClient { @Override public void close() throws IOException { if ( ! closed.getAndSet(true)) - httpClient.close(); + for (CloseableHttpAsyncClient hc : httpClients) + hc.close(); } private CompletableFuture send(String method, DocumentId documentId, String operationJson, OperationParameters params) { @@ -125,14 +132,26 @@ class HttpFeedClient implements FeedClient { if (operationJson != null) request.setBody(operationJson, ContentType.APPLICATION_JSON); + int index = 0; + int min = Integer.MAX_VALUE; + for (int i = 0; i < httpClients.size(); i++) + if (inflight.get(i).get() < min) { + min = inflight.get(i).get(); + index = i; + } + + CloseableHttpAsyncClient client = httpClients.get(index); + AtomicInteger counter = inflight.get(index); + counter.incrementAndGet(); return requestStrategy.enqueue(documentId, future -> { - httpClient.execute(request, - new FutureCallback() { - @Override public void completed(SimpleHttpResponse response) { future.complete(response); } - @Override public void failed(Exception ex) { future.completeExceptionally(ex); } - @Override public void cancelled() { future.cancel(false); } - }); + client.execute(request, + new FutureCallback() { + @Override public void completed(SimpleHttpResponse response) { future.complete(response); } + @Override public void failed(Exception ex) { future.completeExceptionally(ex); } + @Override public void cancelled() { future.cancel(false); } + }); }).handle((response, thrown) -> { + counter.decrementAndGet(); if (thrown != null) { if (requestStrategy.hasFailed()) { try { close(); } -- cgit v1.2.3