summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-05-27 12:37:55 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-05-27 12:37:55 +0200
commit71da359825e57313ccc949582590258588bcfe38 (patch)
treea1b26550c815b5d8b11d3fb1c37d4c4496436b13
parente78edb302f800a67e717af0e429866dccbcbedf0 (diff)
Load balance over multiple clients/connections
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java41
1 files changed, 30 insertions, 11 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
index 1450d306733..e79f8dac67e 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
@@ -43,16 +44,21 @@ class HttpFeedClient implements FeedClient {
private final URI endpoint;
private final Map<String, Supplier<String>> requestHeaders;
private final HttpRequestStrategy requestStrategy;
- private final CloseableHttpAsyncClient httpClient;
+ private final List<CloseableHttpAsyncClient> httpClients = new ArrayList<>();
+ private final List<AtomicInteger> inflight = new ArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean();
HttpFeedClient(FeedClientBuilder builder) throws IOException {
this.endpoint = builder.endpoint;
this.requestHeaders = new HashMap<>(builder.requestHeaders);
-
this.requestStrategy = new HttpRequestStrategy(builder);
- this.httpClient = createHttpClient(builder, requestStrategy);
- this.httpClient.start();
+
+ for (int i = 0; i < builder.maxConnections; i++) {
+ CloseableHttpAsyncClient hc = createHttpClient(builder, requestStrategy);
+ hc.start();
+ httpClients.add(hc);
+ inflight.add(new AtomicInteger());
+ }
}
private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder, HttpRequestStrategy retryStrategy) throws IOException {
@@ -116,7 +122,8 @@ class HttpFeedClient implements FeedClient {
@Override
public void close() throws IOException {
if ( ! closed.getAndSet(true))
- httpClient.close();
+ for (CloseableHttpAsyncClient hc : httpClients)
+ hc.close();
}
private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) {
@@ -125,14 +132,26 @@ class HttpFeedClient implements FeedClient {
if (operationJson != null)
request.setBody(operationJson, ContentType.APPLICATION_JSON);
+ int index = 0;
+ int min = Integer.MAX_VALUE;
+ for (int i = 0; i < httpClients.size(); i++)
+ if (inflight.get(i).get() < min) {
+ min = inflight.get(i).get();
+ index = i;
+ }
+
+ CloseableHttpAsyncClient client = httpClients.get(index);
+ AtomicInteger counter = inflight.get(index);
+ counter.incrementAndGet();
return requestStrategy.enqueue(documentId, future -> {
- httpClient.execute(request,
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) { future.complete(response); }
- @Override public void failed(Exception ex) { future.completeExceptionally(ex); }
- @Override public void cancelled() { future.cancel(false); }
- });
+ client.execute(request,
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { future.complete(response); }
+ @Override public void failed(Exception ex) { future.completeExceptionally(ex); }
+ @Override public void cancelled() { future.cancel(false); }
+ });
}).handle((response, thrown) -> {
+ counter.decrementAndGet();
if (thrown != null) {
if (requestStrategy.hasFailed()) {
try { close(); }