summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-28 13:52:49 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-28 13:52:49 +0200
commit7fac15f41005fd65e44a5699f67c1a576d8eb3e7 (patch)
tree783d67eabf9539b5cc7efbd2a37746c9efe03efa /vespaclient-container-plugin/src
parent620b983bbc9a8d2bb1252fbb30e5941f3927563b (diff)
Test throttling and handle responses arriving before their acks
Diffstat (limited to 'vespaclient-container-plugin/src')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java68
1 files changed, 47 insertions, 21 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
index 8a0ae33a1f3..36e92c776a1 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -37,18 +37,15 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
@@ -82,7 +79,7 @@ public class DocumentOperationExecutor {
private final Clock clock;
private final DelayQueue throttled;
private final DelayQueue timeouts;
- private final Map<Long, OperationContext> outstanding = new ConcurrentHashMap<>();
+ private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
@@ -101,10 +98,10 @@ public class DocumentOperationExecutor {
this.visitTimeout = requireNonNull(visitTimeout);
this.maxThrottled = maxThrottled;
this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handleResponse));
+ this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
this.clock = requireNonNull(clock);
this.clusters = Map.copyOf(clusters);
- this.throttled = new DelayQueue(200, this::send, resendDelay, clock);
+ this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
}
@@ -397,7 +394,7 @@ public class DocumentOperationExecutor {
Result result = operation.get();
switch (result.type()) {
case SUCCESS:
- outstanding.put(result.getRequestId(), context);
+ outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
break;
case TRANSIENT_ERROR:
if ( ! throttled.add(operation, context))
@@ -410,16 +407,8 @@ public class DocumentOperationExecutor {
}
}
- private void handleResponse(Response response){
- OperationContext context = outstanding.remove(response.getRequestId());
- if (context != null)
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- else
- log.log(Level.FINE, () -> "Received response for request which has timed out, with id " + response.getRequestId());
+ private void handle(Response response) {
+ outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
}
private static ErrorType toErrorType(Response.Outcome outcome) {
@@ -465,6 +454,43 @@ public class DocumentOperationExecutor {
}
+ private static class Completion {
+
+ private final OperationContext context;
+ private final Response response;
+
+ private Completion(OperationContext context, Response response) {
+ this.context = context;
+ this.response = response;
+ }
+
+ static Completion of(OperationContext context) {
+ return new Completion(requireNonNull(context), null);
+ }
+
+ static Completion of(Response response) {
+ return new Completion(null, requireNonNull(response));
+ }
+
+ Completion merge(Completion other) {
+ if (context == null)
+ complete(other.context, response);
+ else
+ complete(context, other.response);
+ return null;
+ }
+
+ private void complete(OperationContext context, Response response) {
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ }
+
+ }
+
+
/**
* Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
*
@@ -547,9 +573,9 @@ public class DocumentOperationExecutor {
}
// Ready for action: remove from queue and run.
if (delayed.readyAt().isBefore(clock.instant())) {
- action.accept(delayed.operation(), delayed.context());
operations.remove();
size.decrementAndGet();
+ action.accept(delayed.operation(), delayed.context());
continue;
}
// Not yet ready for action: keep time to wake up again.
@@ -662,9 +688,9 @@ public class DocumentOperationExecutor {
// Visible for testing.
AsyncSession asyncSession() { return asyncSession; }
- void notifyMaintainers() {
- synchronized (throttled) { throttled.notify(); }
- synchronized (timeouts) { timeouts.notify(); }
+ void notifyMaintainers() throws InterruptedException {
+ synchronized (throttled) { throttled.notify(); throttled.wait(); }
+ synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
}
}