diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-05-15 09:08:25 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-05-15 09:08:25 +0200 |
commit | d10bfdd775f227974e5b1ad2f9972a3135404c9a (patch) | |
tree | 131042eee8e65925cd8189042c11cbab1ef816bd /vespa-feed-client | |
parent | 9dbb2f444a03090b2d7e5dcee1252e343e4ec1dd (diff) |
Move dispatch to request strategy, and allow only a single concurrent op per id
Diffstat (limited to 'vespa-feed-client')
3 files changed, 56 insertions, 23 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java index cb5237a9328..236ba984114 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java @@ -126,24 +126,14 @@ class HttpFeedClient implements FeedClient { if (operationJson != null) request.setBody(operationJson, ContentType.APPLICATION_JSON); - requestStrategy.acquireSlot(); - - CompletableFuture<SimpleHttpResponse> future = new CompletableFuture<>(); - httpClient.execute(new SimpleHttpRequest(method, endpoint), - new FutureCallback<SimpleHttpResponse>() { - @Override public void completed(SimpleHttpResponse response) { - future.complete(response); - } - @Override public void failed(Exception ex) { - future.completeExceptionally(ex); - } - @Override public void cancelled() { - future.cancel(false); - } - }); - - return future.handle((response, thrown) -> { - requestStrategy.releaseSlot(); + return requestStrategy.enqueue(documentId, future -> { + httpClient.execute(new SimpleHttpRequest(method, endpoint), + new FutureCallback<SimpleHttpResponse>() { + @Override public void completed(SimpleHttpResponse response) { future.complete(response); } + @Override public void failed(Exception ex) { future.completeExceptionally(ex); } + @Override public void cancelled() { future.cancel(false); } + }); + }).handle((response, thrown) -> { if (thrown != null) { if (requestStrategy.hasFailed()) { try { close(); } @@ -151,7 +141,6 @@ class HttpFeedClient implements FeedClient { } return new Result(failure, documentId, thrown.getMessage(), null); } - requestStrategy.success(); return toResult(response, documentId); }); } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java index 7520f68feed..1fef67dda56 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java @@ -2,16 +2,21 @@ package com.yahoo.vespa.feed.client; import org.apache.hc.client5.http.HttpRequestRetryStrategy; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.util.TimeValue; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static com.yahoo.vespa.feed.client.FeedClient.OperationType.remove; @@ -26,8 +31,9 @@ import static com.yahoo.vespa.feed.client.FeedClient.OperationType.remove; * * @author jonmv */ -class HttpRequestStrategy implements HttpRequestRetryStrategy { +class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRequestRetryStrategy { + private final Map<DocumentId, CompletableFuture<SimpleHttpResponse>> byId = new ConcurrentHashMap<>(); private final FeedClient.RetryStrategy wrapped; private final long maxInflight; private double targetInflight; @@ -38,8 +44,7 @@ class HttpRequestStrategy implements HttpRequestRetryStrategy { private final Condition available; HttpRequestStrategy(FeedClientBuilder builder) { - this.wrapped = builder.retryStrategy != null ? builder.retryStrategy : new FeedClient.RetryStrategy() { - }; + this.wrapped = builder.retryStrategy; this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection; this.targetInflight = maxInflight; this.inflight = 0; @@ -121,8 +126,27 @@ class HttpRequestStrategy implements HttpRequestRetryStrategy { } } - boolean hasFailed() { + @Override + public boolean hasFailed() { return errorRate.get() > errorThreshold; } + @Override + public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) { + acquireSlot(); + + CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); + byId.compute(documentId, (id, previous) -> { // TODO; consider merging with above locking. + if (previous == null) dispatch.accept(vessel); + else previous.whenComplete((__, ___) -> dispatch.accept(vessel)); // TODO: keep a list so we can empty it? + return vessel; + }); + + return vessel.whenComplete((__, thrown) -> { + releaseSlot(); + if (thrown == null) + success(); + }); + } + } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java new file mode 100644 index 00000000000..194815aadf8 --- /dev/null +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java @@ -0,0 +1,20 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.client; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Controls execution of feed operations. + * + * @author jonmv + */ +public interface RequestStrategy<T> { + + /** Whether this has failed, and we should stop. */ + boolean hasFailed(); + + /** Enqueue the given operation, which is dispatched to a vessel future when ready. */ + CompletableFuture<T> enqueue(DocumentId documentId, Consumer<CompletableFuture<T>> dispatch); + +} |