aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-05-15 09:08:25 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-05-15 09:08:25 +0200
commitd10bfdd775f227974e5b1ad2f9972a3135404c9a (patch)
tree131042eee8e65925cd8189042c11cbab1ef816bd /vespa-feed-client
parent9dbb2f444a03090b2d7e5dcee1252e343e4ec1dd (diff)
Move dispatch to request strategy, and allow only a single concurrent op per id
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java27
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java32
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java20
3 files changed, 56 insertions, 23 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
index cb5237a9328..236ba984114 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
@@ -126,24 +126,14 @@ class HttpFeedClient implements FeedClient {
if (operationJson != null)
request.setBody(operationJson, ContentType.APPLICATION_JSON);
- requestStrategy.acquireSlot();
-
- CompletableFuture<SimpleHttpResponse> future = new CompletableFuture<>();
- httpClient.execute(new SimpleHttpRequest(method, endpoint),
- new FutureCallback<SimpleHttpResponse>() {
- @Override public void completed(SimpleHttpResponse response) {
- future.complete(response);
- }
- @Override public void failed(Exception ex) {
- future.completeExceptionally(ex);
- }
- @Override public void cancelled() {
- future.cancel(false);
- }
- });
-
- return future.handle((response, thrown) -> {
- requestStrategy.releaseSlot();
+ return requestStrategy.enqueue(documentId, future -> {
+ httpClient.execute(new SimpleHttpRequest(method, endpoint),
+ new FutureCallback<SimpleHttpResponse>() {
+ @Override public void completed(SimpleHttpResponse response) { future.complete(response); }
+ @Override public void failed(Exception ex) { future.completeExceptionally(ex); }
+ @Override public void cancelled() { future.cancel(false); }
+ });
+ }).handle((response, thrown) -> {
if (thrown != null) {
if (requestStrategy.hasFailed()) {
try { close(); }
@@ -151,7 +141,6 @@ class HttpFeedClient implements FeedClient {
}
return new Result(failure, documentId, thrown.getMessage(), null);
}
- requestStrategy.success();
return toResult(response, documentId);
});
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
index 7520f68feed..1fef67dda56 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
@@ -2,16 +2,21 @@
package com.yahoo.vespa.feed.client;
import org.apache.hc.client5.http.HttpRequestRetryStrategy;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue;
import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import static com.yahoo.vespa.feed.client.FeedClient.OperationType.remove;
@@ -26,8 +31,9 @@ import static com.yahoo.vespa.feed.client.FeedClient.OperationType.remove;
*
* @author jonmv
*/
-class HttpRequestStrategy implements HttpRequestRetryStrategy {
+class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRequestRetryStrategy {
+ private final Map<DocumentId, CompletableFuture<SimpleHttpResponse>> byId = new ConcurrentHashMap<>();
private final FeedClient.RetryStrategy wrapped;
private final long maxInflight;
private double targetInflight;
@@ -38,8 +44,7 @@ class HttpRequestStrategy implements HttpRequestRetryStrategy {
private final Condition available;
HttpRequestStrategy(FeedClientBuilder builder) {
- this.wrapped = builder.retryStrategy != null ? builder.retryStrategy : new FeedClient.RetryStrategy() {
- };
+ this.wrapped = builder.retryStrategy;
this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection;
this.targetInflight = maxInflight;
this.inflight = 0;
@@ -121,8 +126,27 @@ class HttpRequestStrategy implements HttpRequestRetryStrategy {
}
}
- boolean hasFailed() {
+ @Override
+ public boolean hasFailed() {
return errorRate.get() > errorThreshold;
}
+ @Override
+ public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) {
+ acquireSlot();
+
+ CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>();
+ byId.compute(documentId, (id, previous) -> { // TODO; consider merging with above locking.
+ if (previous == null) dispatch.accept(vessel);
+ else previous.whenComplete((__, ___) -> dispatch.accept(vessel)); // TODO: keep a list so we can empty it?
+ return vessel;
+ });
+
+ return vessel.whenComplete((__, thrown) -> {
+ releaseSlot();
+ if (thrown == null)
+ success();
+ });
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java
new file mode 100644
index 00000000000..194815aadf8
--- /dev/null
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/RequestStrategy.java
@@ -0,0 +1,20 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Controls execution of feed operations.
+ *
+ * @author jonmv
+ */
+public interface RequestStrategy<T> {
+
+ /** Whether this has failed, and we should stop. */
+ boolean hasFailed();
+
+ /** Enqueue the given operation, which is dispatched to a vessel future when ready. */
+ CompletableFuture<T> enqueue(DocumentId documentId, Consumer<CompletableFuture<T>> dispatch);
+
+}