diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 13:38:28 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 13:38:28 +0200 |
commit | 6a7d5bda4b22c179e57de440ff46a3ebe667939b (patch) | |
tree | 29e3cb3049a32bc8409a244cfa4df262beafa781 /documentapi | |
parent | 27632e92ff8182e1df0763fe6a9382e88c76dbf9 (diff) |
Allow closures for handling document api Responses
Diffstat (limited to 'documentapi')
4 files changed, 87 insertions, 69 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index 6b44153faca..a204da107f0 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -167,10 +167,12 @@ "public com.yahoo.documentapi.DocumentOperationParameters withFieldSet(java.lang.String)", "public com.yahoo.documentapi.DocumentOperationParameters withRoute(java.lang.String)", "public com.yahoo.documentapi.DocumentOperationParameters withTraceLevel(int)", + "public com.yahoo.documentapi.DocumentOperationParameters withResponseHandler(com.yahoo.documentapi.ResponseHandler)", "public java.util.Optional priority()", "public java.util.Optional fieldSet()", "public java.util.Optional route()", "public java.util.OptionalInt traceLevel()", + "public java.util.Optional responseHandler()", "public boolean equals(java.lang.Object)", "public int hashCode()", "public java.lang.String toString()" @@ -972,14 +974,13 @@ "public void <init>(com.yahoo.documentapi.AsyncParameters, com.yahoo.documentapi.local.LocalDocumentAccess)", "public double getCurrentWindowSize()", "public com.yahoo.documentapi.Result put(com.yahoo.document.Document)", - "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId)", - "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, boolean, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", - "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId)", - "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate)", - "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Response getNext()", "public com.yahoo.documentapi.Response getNext(int)", "public void destroy()", diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java index 05535e540b5..1d934680586 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java @@ -18,18 +18,21 @@ import static java.util.Objects.requireNonNull; */ public class DocumentOperationParameters { - private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1); + private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null); private final DocumentProtocol.Priority priority; private final String fieldSet; private final String route; private final int traceLevel; + private final ResponseHandler responseHandler; - private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) { + private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, + int traceLevel, ResponseHandler responseHandler) { this.priority = priority; this.fieldSet = fieldSet; this.route = route; this.traceLevel = traceLevel; + this.responseHandler = responseHandler; } public static DocumentOperationParameters parameters() { @@ -38,22 +41,22 @@ public class DocumentOperationParameters { /** Sets the priority with which to perform an operation. */ public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) { - return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel); + return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(FieldSet fieldSet) { - return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel); + return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(String fieldSet) { - return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel); + return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler); } /** Sets the route along which to send the operation. */ public DocumentOperationParameters withRoute(String route) { - return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel); + return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler); } /** Sets the trace level for an operation. */ @@ -61,13 +64,19 @@ public class DocumentOperationParameters { if (traceLevel < 0 || traceLevel > 9) throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)"); - return new DocumentOperationParameters(priority, fieldSet, route, traceLevel); + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler); + } + + /** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */ + public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) { + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler)); } public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); } public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); } public Optional<String> route() { return Optional.ofNullable(route); } public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); } + public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); } @Override public boolean equals(Object o) { diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 639031e4cdb..8781e4a3a51 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentIdResponse; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.DocumentUpdateResponse; import com.yahoo.documentapi.RemoveResponse; @@ -18,9 +19,7 @@ import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SyncParameters; import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.UpdateResponse; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; import static com.yahoo.documentapi.Result.ResultType.SUCCESS; /** @@ -61,77 +61,75 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result put(Document document) { - return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3); + return put(new DocumentPut(document), parameters()); } @Override - public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { + public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { return send(req -> { - try { - syncSession.put(documentPut, pri); - return new DocumentResponse(req, documentPut.getDocument()); - } - catch (Exception e) { - return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); - } - }); + try { + syncSession.put(documentPut, parameters); + return new DocumentResponse(req, documentPut.getDocument()); + } + catch (Exception e) { + return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); + } + }, + parameters); } @Override public Result get(DocumentId id) { - return get(id, false, DocumentProtocol.Priority.NORMAL_3); + return get(id, parameters()); } @Override - @Deprecated // TODO: Remove on Vespa 8 - public Result get(DocumentId id, boolean headersOnly, DocumentProtocol.Priority pri) { - return get(id, pri); - } - - @Override - public Result get(DocumentId id, DocumentProtocol.Priority pri) { + public Result get(DocumentId id, DocumentOperationParameters parameters) { return send(req -> { - try { - return new DocumentResponse(req, syncSession.get(id)); - } - catch (Exception e) { - return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); - } - }); + try { + return new DocumentResponse(req, syncSession.get(id, parameters, null)); + } + catch (Exception e) { + return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); + } + }, + parameters); } @Override public Result remove(DocumentId id) { - return remove(id, DocumentProtocol.Priority.NORMAL_3); + return remove(id, parameters()); } @Override - public Result remove(DocumentId id, DocumentProtocol.Priority pri) { + public Result remove(DocumentId id, DocumentOperationParameters parameters) { return send(req -> { - if (syncSession.remove(new DocumentRemove(id), pri)) { - return new RemoveResponse(req, true); - } - else { - return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); - } - }); + if (syncSession.remove(new DocumentRemove(id), parameters)) { + return new RemoveResponse(req, true); + } + else { + return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); + } + }, + parameters); } @Override public Result update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_3); + return update(update, parameters()); } @Override - public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { + public Result update(DocumentUpdate update, DocumentOperationParameters parameters) { return send(req -> { - if (syncSession.update(update, pri)) { - return new UpdateResponse(req, true); - } - else { - return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); - } - }); + if (syncSession.update(update, parameters)) { + return new UpdateResponse(req, true); + } + else { + return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); + } + }, + parameters); } @Override @@ -162,21 +160,22 @@ public class LocalAsyncSession implements AsyncSession { } } - private Result send(Function<Long, Response> responses) { + private Result send(Function<Long, Response> responses, DocumentOperationParameters parameters) { Result.ResultType resultType = result.get(); if (resultType != SUCCESS) return new Result(resultType, new Error()); + ResponseHandler responseHandler = parameters.responseHandler().orElse(this::addResponse); long req = requestId.incrementAndGet(); Phaser synchronizer = phaser.get(); if (synchronizer == null) - addResponse(responses.apply(req)); + responseHandler.handleResponse(responses.apply(req)); else { synchronizer.register(); executor.execute(() -> { try { synchronizer.arriveAndAwaitAdvance(); - addResponse(responses.apply(req)); + responseHandler.handleResponse(responses.apply(req)); } finally { synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index 7a71089c180..0a4ab66aea4 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -184,7 +184,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { Result send(Message msg, DocumentOperationParameters parameters) { try { long reqId = requestId.incrementAndGet(); - msg.setContext(reqId); + msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null))); msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel)); // Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew! String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); @@ -198,6 +198,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } } + private static class OperationContext { + private final long reqId; + private final ResponseHandler responseHandler; + private OperationContext(long reqId, ResponseHandler responseHandler) { + this.reqId = reqId; + this.responseHandler = responseHandler; + } + } + /** * A convenience method for assigning the internal trace level and route string to a message before sending it * through the internal mbus session object. @@ -206,7 +215,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { * @return the document api result object. */ public Result send(Message msg) { - return send(msg, null); + return send(msg, parameters()); } @Override @@ -285,11 +294,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { new Error(mbusResult.getError().getMessage() + " (" + mbusResult.getError().getCode() + ")")); } - private static Response toResponse(Reply reply) { - long reqId = (Long) reply.getContext(); - return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId); - } - private static Response toError(Reply reply, long reqId) { boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound() || reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound(); @@ -346,8 +350,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { if (reply.getTrace().getLevel() > 0) { log.log(Level.INFO, reply.getTrace().toString()); } - Response response = toResponse(reply); - if (handler != null) { + OperationContext context = (OperationContext) reply.getContext(); + long reqId = context.reqId; + Response response = reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId); + ResponseHandler operationSpecificResponseHandler = context.responseHandler; + if (operationSpecificResponseHandler != null) + operationSpecificResponseHandler.handleResponse(response); + else if (handler != null) { handler.handleResponse(response); } else { queue.add(response); |