summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-30 13:48:28 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 13:48:28 +0200
commitb1a0ce6e16ba38dc342e2f5b9544e0f0b6329f4d (patch)
tree63bc446354e12883237bd9d45e5aab243ff56bbc /vespaclient-container-plugin
parent6a7d5bda4b22c179e57de440ff46a3ebe667939b (diff)
Simplify with closures to handle async responses
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java64
1 files changed, 16 insertions, 48 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
index d8a94451cd5..98b58fc2b7c 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
@@ -17,6 +17,7 @@ import com.yahoo.documentapi.DocumentResponse;
import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.ResponseHandler;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
@@ -81,7 +82,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
private final Clock clock;
private final DelayQueue throttled;
private final DelayQueue timeouts;
- private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
@@ -100,7 +100,7 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
this.visitTimeout = requireNonNull(visitTimeout);
this.maxThrottled = maxThrottled;
this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
+ this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clock = requireNonNull(clock);
this.clusters = Map.copyOf(clusters);
this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
@@ -166,22 +166,22 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
@Override
public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.get(id, parameters), context);
+ accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(context))), context);
}
@Override
public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.put(put, parameters), context);
+ accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(context))), context);
}
@Override
public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.update(update, parameters), context);
+ accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(context))), context);
}
@Override
public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.remove(id, parameters), context);
+ accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(context))), context);
}
@Override
@@ -230,6 +230,16 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
return resolveCluster(Optional.of(cluster), clusters).route();
}
+ private ResponseHandler handlerOf(OperationContext context) {
+ return response -> {
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ };
+ }
+
/** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
private void accept(Supplier<Result> operation, OperationContext context) {
timeouts.add(operation, context);
@@ -243,7 +253,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
Result result = operation.get();
switch (result.type()) {
case SUCCESS:
- outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
return true;
case TRANSIENT_ERROR:
return false;
@@ -255,10 +264,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
}
}
- private void handle(Response response) {
- outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
- }
-
private static ErrorType toErrorType(Response.Outcome outcome) {
switch (outcome) {
case NOT_FOUND:
@@ -273,43 +278,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
}
- private static class Completion {
-
- private final OperationContext context;
- private final Response response;
-
- private Completion(OperationContext context, Response response) {
- this.context = context;
- this.response = response;
- }
-
- static Completion of(OperationContext context) {
- return new Completion(requireNonNull(context), null);
- }
-
- static Completion of(Response response) {
- return new Completion(null, requireNonNull(response));
- }
-
- Completion merge(Completion other) {
- if (context == null)
- complete(other.context, response);
- else
- complete(context, other.response);
- return null;
- }
-
- private void complete(OperationContext context, Response response) {
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- }
-
- }
-
-
/**
* Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
*