diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2021-05-18 17:41:13 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-18 17:41:13 +0200 |
commit | 4a210b8438d20a167ecfb74b0343f3acb8cb06ce (patch) | |
tree | 91aa8b631d31b638bb60c7239c1168bbf5990a89 | |
parent | 417f6ceb7dc7a71a591205399697962569a544c3 (diff) | |
parent | 380b78bb4dd288978ad3b07b19c21ddecddde51e (diff) |
Merge pull request #17884 from vespa-engine/jonmv/http2-feeding
Jonmv/http2 feeding
-rw-r--r-- | vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java | 3 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java | 12 |
2 files changed, 11 insertions, 4 deletions
diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java index 236ba984114..39fcfd45a6f 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpFeedClient.java @@ -115,7 +115,8 @@ class HttpFeedClient implements FeedClient { return send("DELETE", documentId, null, params); } - @Override public void close() throws IOException { + @Override + public void close() throws IOException { if ( ! closed.getAndSet(true)) httpClient.close(); } diff --git a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java index 1fef67dda56..db6bc8d21fe 100644 --- a/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/com/yahoo/vespa/feed/client/HttpRequestStrategy.java @@ -135,10 +135,14 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe public CompletableFuture<SimpleHttpResponse> enqueue(DocumentId documentId, Consumer<CompletableFuture<SimpleHttpResponse>> dispatch) { acquireSlot(); + Consumer<CompletableFuture<SimpleHttpResponse>> safeDispatch = vessel -> { + try { dispatch.accept(vessel); } + catch (Throwable t) { vessel.completeExceptionally(t); } + }; CompletableFuture<SimpleHttpResponse> vessel = new CompletableFuture<>(); - byId.compute(documentId, (id, previous) -> { // TODO; consider merging with above locking. - if (previous == null) dispatch.accept(vessel); - else previous.whenComplete((__, ___) -> dispatch.accept(vessel)); // TODO: keep a list so we can empty it? + byId.compute(documentId, (id, previous) -> { + if (previous == null) safeDispatch.accept(vessel); + else previous.whenComplete((__, ___) -> safeDispatch.accept(vessel)); return vessel; }); @@ -146,6 +150,8 @@ class HttpRequestStrategy implements RequestStrategy<SimpleHttpResponse>, HttpRe releaseSlot(); if (thrown == null) success(); + + byId.compute(documentId, (id, current) -> current == vessel ? null : current); }); } |