diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 13:52:49 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-28 13:52:49 +0200 |
commit | 7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (patch) | |
tree | 783d67eabf9539b5cc7efbd2a37746c9efe03efa /vespaclient-container-plugin/src | |
parent | 620b983bbc9a8d2bb1252fbb30e5941f3927563b (diff) |
Test throttling and handle responses arriving before their acks
Diffstat (limited to 'vespaclient-container-plugin/src')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java | 68 |
1 files changed, 47 insertions, 21 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java index 8a0ae33a1f3..36e92c776a1 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java @@ -37,18 +37,15 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; @@ -82,7 +79,7 @@ public class DocumentOperationExecutor { private final Clock clock; private final DelayQueue throttled; private final DelayQueue timeouts; - private final Map<Long, OperationContext> outstanding = new ConcurrentHashMap<>(); + private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, @@ -101,10 +98,10 @@ public class DocumentOperationExecutor { this.visitTimeout = requireNonNull(visitTimeout); this.maxThrottled = maxThrottled; this.access = requireNonNull(access); - this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handleResponse)); + this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle)); this.clock = requireNonNull(clock); this.clusters = Map.copyOf(clusters); - this.throttled = new DelayQueue(200, this::send, resendDelay, clock); + this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock); this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock); } @@ -397,7 +394,7 @@ public class DocumentOperationExecutor { Result result = operation.get(); switch (result.type()) { case SUCCESS: - outstanding.put(result.getRequestId(), context); + outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge); break; case TRANSIENT_ERROR: if ( ! throttled.add(operation, context)) @@ -410,16 +407,8 @@ public class DocumentOperationExecutor { } } - private void handleResponse(Response response){ - OperationContext context = outstanding.remove(response.getRequestId()); - if (context != null) - if (response.isSuccess()) - context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) - : Optional.empty()); - else - context.error(toErrorType(response.outcome()), response.getTextMessage()); - else - log.log(Level.FINE, () -> "Received response for request which has timed out, with id " + response.getRequestId()); + private void handle(Response response) { + outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge); } private static ErrorType toErrorType(Response.Outcome outcome) { @@ -465,6 +454,43 @@ public class DocumentOperationExecutor { } + private static class Completion { + + private final OperationContext context; + private final Response response; + + private Completion(OperationContext context, Response response) { + this.context = context; + this.response = response; + } + + static Completion of(OperationContext context) { + return new Completion(requireNonNull(context), null); + } + + static Completion of(Response response) { + return new Completion(null, requireNonNull(response)); + } + + Completion merge(Completion other) { + if (context == null) + complete(other.context, response); + else + complete(context, other.response); + return null; + } + + private void complete(OperationContext context, Response response) { + if (response.isSuccess()) + context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) + : Optional.empty()); + else + context.error(toErrorType(response.outcome()), response.getTextMessage()); + } + + } + + /** * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. * @@ -547,9 +573,9 @@ public class DocumentOperationExecutor { } // Ready for action: remove from queue and run. if (delayed.readyAt().isBefore(clock.instant())) { - action.accept(delayed.operation(), delayed.context()); operations.remove(); size.decrementAndGet(); + action.accept(delayed.operation(), delayed.context()); continue; } // Not yet ready for action: keep time to wake up again. @@ -662,9 +688,9 @@ public class DocumentOperationExecutor { // Visible for testing. AsyncSession asyncSession() { return asyncSession; } - void notifyMaintainers() { - synchronized (throttled) { throttled.notify(); } - synchronized (timeouts) { timeouts.notify(); } + void notifyMaintainers() throws InterruptedException { + synchronized (throttled) { throttled.notify(); throttled.wait(); } + synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } } } |