summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2021-05-18 17:41:13 +0200
committerGitHub <noreply@github.com>2021-05-18 17:41:13 +0200
commit4a210b8438d20a167ecfb74b0343f3acb8cb06ce (patch)
tree91aa8b631d31b638bb60c7239c1168bbf5990a89
parent417f6ceb7dc7a71a591205399697962569a544c3 (diff)
parent380b78bb4dd288978ad3b07b19c21ddecddde51e (diff)
Merge pull request #17884 from vespa-engine/jonmv/http2-feeding
Jonmv/http2 feeding
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java3
-rw-r--r--vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java12
2 files changed, 11 insertions, 4 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
index 236ba984114..39fcfd45a6f 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java
@@ -115,7 +115,8 @@ class HttpFeedClient implements FeedClient {
return send("DELETE", documentId, null, params);
}
- @Override public void close() throws IOException {
+ @Override
+ public void close() throws IOException {
if ( ! closed.getAndSet(true))
httpClient.close();
}
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
index 1fef67dda56..db6bc8d21fe 100644
--- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java
@@ -135,10 +135,14 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe
public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) {
acquireSlot();
+ Consumer<CompletableFuture<SimpleHttpResponse>> safeDispatch = vessel -> {
+ try { dispatch.accept(vessel); }
+ catch (Throwable t) { vessel.completeExceptionally(t); }
+ };
CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>();
- byId.compute(documentId, (id, previous) -> { // TODO; consider merging with above locking.
- if (previous == null) dispatch.accept(vessel);
- else previous.whenComplete((__, ___) -> dispatch.accept(vessel)); // TODO: keep a list so we can empty it?
+ byId.compute(documentId, (id, previous) -> {
+ if (previous == null) safeDispatch.accept(vessel);
+ else previous.whenComplete((__, ___) -> safeDispatch.accept(vessel));
return vessel;
});
@@ -146,6 +150,8 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe
releaseSlot();
if (thrown == null)
success();
+
+ byId.compute(documentId, (id, current) -> current == vessel ? null : current);
});
}