diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-02 10:07:05 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-02 10:07:05 +0200 |
commit | be686adbb8a024bc912eb36f4be40be0fdfc9364 (patch) | |
tree | ecd14435dad09f4d9e872d35f8c79b4bb7c0b159 /vespa-feed-client | |
parent | 638f406e5459483cf7cc115e6f0ec6de395a2240 (diff) |
Move opperation bblocking up
Diffstat (limited to 'vespa-feed-client')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index a202aa92126..3d4cfdd211b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -190,14 +190,27 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { else // ... or send when the previous inflight is done. previous.thenRun(() -> dispatch.accept(request, vessel)); - handleAttempt(vessel, dispatch, blocker, request, result, documentId, 1); + handleAttempt(vessel, dispatch, request, result, 1); + + result.thenRun(() -> { + CompletableFuture<Void> current; + synchronized (monitor) { + current = inflightById.get(documentId); + if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ... + releaseSlot(); + inflightById.put(documentId, null); + } + } + if (current != blocker) // ... or trigger sending the next enqueued operation. + blocker.complete(null); + }); + return result; } /** Handles the result of one attempt at the given operation, retrying if necessary. */ private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch, - CompletableFuture<Void> blocker, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, - DocumentId documentId, int attempt) { + SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) { vessel.whenComplete((response, thrown) -> { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) @@ -208,22 +221,11 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry))); else dispatch.accept(request, retry); - handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + (hasFailed ? 0 : 1)); + handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1)); return; } // ... or accept the outcome and mark the operation as complete. - CompletableFuture<Void> current; - synchronized (monitor) { - current = inflightById.get(documentId); - if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ... - releaseSlot(); - inflightById.put(documentId, null); - } - } - if (current != blocker) // ... or trigger sending the next enqueued operation. - blocker.complete(null); - if (thrown == null) result.complete(response); else result.completeExceptionally(thrown); }); |