aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-02 10:07:05 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-02 10:07:05 +0200
commitbe686adbb8a024bc912eb36f4be40be0fdfc9364 (patch)
treeecd14435dad09f4d9e872d35f8c79b4bb7c0b159 /vespa-feed-client
parent638f406e5459483cf7cc115e6f0ec6de395a2240 (diff)
Move opperation bblocking up
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java32
1 files changed, 17 insertions, 15 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index a202aa92126..3d4cfdd211b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -190,14 +190,27 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
else // ... or send when the previous inflight is done.
previous.thenRun(() -> dispatch.accept(request, vessel));
- handleAttempt(vessel, dispatch, blocker, request, result, documentId, 1);
+ handleAttempt(vessel, dispatch, request, result, 1);
+
+ result.thenRun(() -> {
+ CompletableFuture<Void> current;
+ synchronized (monitor) {
+ current = inflightById.get(documentId);
+ if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ...
+ releaseSlot();
+ inflightById.put(documentId, null);
+ }
+ }
+ if (current != blocker) // ... or trigger sending the next enqueued operation.
+ blocker.complete(null);
+ });
+
return result;
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
private void handleAttempt(CompletableFuture<SimpleHttpResponse> vessel, BiConsumer<SimpleHttpRequest, CompletableFuture<SimpleHttpResponse>> dispatch,
- CompletableFuture<Void> blocker, SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result,
- DocumentId documentId, int attempt) {
+ SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> result, int attempt) {
vessel.whenComplete((response, thrown) -> {
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
@@ -208,22 +221,11 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable {
delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry)));
else
dispatch.accept(request, retry);
- handleAttempt(retry, dispatch, blocker, request, result, documentId, attempt + (hasFailed ? 0 : 1));
+ handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1));
return;
}
// ... or accept the outcome and mark the operation as complete.
- CompletableFuture<Void> current;
- synchronized (monitor) {
- current = inflightById.get(documentId);
- if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ...
- releaseSlot();
- inflightById.put(documentId, null);
- }
- }
- if (current != blocker) // ... or trigger sending the next enqueued operation.
- blocker.complete(null);
-
if (thrown == null) result.complete(response);
else result.completeExceptionally(thrown);
});