diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 13:48:28 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 13:48:28 +0200 |
commit | b1a0ce6e16ba38dc342e2f5b9544e0f0b6329f4d (patch) | |
tree | 63bc446354e12883237bd9d45e5aab243ff56bbc /vespaclient-container-plugin | |
parent | 6a7d5bda4b22c179e57de440ff46a3ebe667939b (diff) |
Simplify with closures to handle async responses
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java | 64 |
1 files changed, 16 insertions, 48 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java index d8a94451cd5..98b58fc2b7c 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java @@ -17,6 +17,7 @@ import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.ResponseHandler; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; @@ -81,7 +82,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor private final Clock clock; private final DelayQueue throttled; private final DelayQueue timeouts; - private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, @@ -100,7 +100,7 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor this.visitTimeout = requireNonNull(visitTimeout); this.maxThrottled = maxThrottled; this.access = requireNonNull(access); - this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle)); + this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clock = requireNonNull(clock); this.clusters = Map.copyOf(clusters); this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock); @@ -166,22 +166,22 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor @Override public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.get(id, parameters), context); + accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(context))), context); } @Override public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.put(put, parameters), context); + accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(context))), context); } @Override public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.update(update, parameters), context); + accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(context))), context); } @Override public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.remove(id, parameters), context); + accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(context))), context); } @Override @@ -230,6 +230,16 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor return resolveCluster(Optional.of(cluster), clusters).route(); } + private ResponseHandler handlerOf(OperationContext context) { + return response -> { + if (response.isSuccess()) + context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) + : Optional.empty()); + else + context.error(toErrorType(response.outcome()), response.getTextMessage()); + }; + } + /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ private void accept(Supplier<Result> operation, OperationContext context) { timeouts.add(operation, context); @@ -243,7 +253,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor Result result = operation.get(); switch (result.type()) { case SUCCESS: - outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge); return true; case TRANSIENT_ERROR: return false; @@ -255,10 +264,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor } } - private void handle(Response response) { - outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge); - } - private static ErrorType toErrorType(Response.Outcome outcome) { switch (outcome) { case NOT_FOUND: @@ -273,43 +278,6 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor } - private static class Completion { - - private final OperationContext context; - private final Response response; - - private Completion(OperationContext context, Response response) { - this.context = context; - this.response = response; - } - - static Completion of(OperationContext context) { - return new Completion(requireNonNull(context), null); - } - - static Completion of(Response response) { - return new Completion(null, requireNonNull(response)); - } - - Completion merge(Completion other) { - if (context == null) - complete(other.context, response); - else - complete(context, other.response); - return null; - } - - private void complete(OperationContext context, Response response) { - if (response.isSuccess()) - context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) - : Optional.empty()); - else - context.error(toErrorType(response.outcome()), response.getTextMessage()); - } - - } - - /** * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. * |