summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-30 13:38:28 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 13:38:28 +0200
commit6a7d5bda4b22c179e57de440ff46a3ebe667939b (patch)
tree29e3cb3049a32bc8409a244cfa4df262beafa781 /documentapi
parent27632e92ff8182e1df0763fe6a9382e88c76dbf9 (diff)
Allow closures for handling document api Responses
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json11
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java23
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java95
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java27
4 files changed, 87 insertions, 69 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index 6b44153faca..a204da107f0 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -167,10 +167,12 @@
"public com.yahoo.documentapi.DocumentOperationParameters withFieldSet(java.lang.String)",
"public com.yahoo.documentapi.DocumentOperationParameters withRoute(java.lang.String)",
"public com.yahoo.documentapi.DocumentOperationParameters withTraceLevel(int)",
+ "public com.yahoo.documentapi.DocumentOperationParameters withResponseHandler(com.yahoo.documentapi.ResponseHandler)",
"public java.util.Optional priority()",
"public java.util.Optional fieldSet()",
"public java.util.Optional route()",
"public java.util.OptionalInt traceLevel()",
+ "public java.util.Optional responseHandler()",
"public boolean equals(java.lang.Object)",
"public int hashCode()",
"public java.lang.String toString()"
@@ -972,14 +974,13 @@
"public void <init>(com.yahoo.documentapi.AsyncParameters, com.yahoo.documentapi.local.LocalDocumentAccess)",
"public double getCurrentWindowSize()",
"public com.yahoo.documentapi.Result put(com.yahoo.document.Document)",
- "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId)",
- "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, boolean, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
- "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId)",
- "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate)",
- "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
"public void destroy()",
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
index 05535e540b5..1d934680586 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
@@ -18,18 +18,21 @@ import static java.util.Objects.requireNonNull;
*/
public class DocumentOperationParameters {
- private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1);
+ private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null);
private final DocumentProtocol.Priority priority;
private final String fieldSet;
private final String route;
private final int traceLevel;
+ private final ResponseHandler responseHandler;
- private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) {
+ private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route,
+ int traceLevel, ResponseHandler responseHandler) {
this.priority = priority;
this.fieldSet = fieldSet;
this.route = route;
this.traceLevel = traceLevel;
+ this.responseHandler = responseHandler;
}
public static DocumentOperationParameters parameters() {
@@ -38,22 +41,22 @@ public class DocumentOperationParameters {
/** Sets the priority with which to perform an operation. */
public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) {
- return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel);
+ return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(FieldSet fieldSet) {
- return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel);
+ return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(String fieldSet) {
- return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel);
+ return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler);
}
/** Sets the route along which to send the operation. */
public DocumentOperationParameters withRoute(String route) {
- return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel);
+ return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler);
}
/** Sets the trace level for an operation. */
@@ -61,13 +64,19 @@ public class DocumentOperationParameters {
if (traceLevel < 0 || traceLevel > 9)
throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)");
- return new DocumentOperationParameters(priority, fieldSet, route, traceLevel);
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler);
+ }
+
+ /** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */
+ public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) {
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler));
}
public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); }
public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); }
public Optional<String> route() { return Optional.ofNullable(route); }
public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); }
+ public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); }
@Override
public boolean equals(Object o) {
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 639031e4cdb..8781e4a3a51 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentIdResponse;
+import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
import com.yahoo.documentapi.DocumentUpdateResponse;
import com.yahoo.documentapi.RemoveResponse;
@@ -18,9 +19,7 @@ import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.SyncParameters;
import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -31,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
/**
@@ -61,77 +61,75 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(Document document) {
- return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3);
+ return put(new DocumentPut(document), parameters());
}
@Override
- public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
+ public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) {
return send(req -> {
- try {
- syncSession.put(documentPut, pri);
- return new DocumentResponse(req, documentPut.getDocument());
- }
- catch (Exception e) {
- return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ try {
+ syncSession.put(documentPut, parameters);
+ return new DocumentResponse(req, documentPut.getDocument());
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
+ }
+ },
+ parameters);
}
@Override
public Result get(DocumentId id) {
- return get(id, false, DocumentProtocol.Priority.NORMAL_3);
+ return get(id, parameters());
}
@Override
- @Deprecated // TODO: Remove on Vespa 8
- public Result get(DocumentId id, boolean headersOnly, DocumentProtocol.Priority pri) {
- return get(id, pri);
- }
-
- @Override
- public Result get(DocumentId id, DocumentProtocol.Priority pri) {
+ public Result get(DocumentId id, DocumentOperationParameters parameters) {
return send(req -> {
- try {
- return new DocumentResponse(req, syncSession.get(id));
- }
- catch (Exception e) {
- return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ try {
+ return new DocumentResponse(req, syncSession.get(id, parameters, null));
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
+ }
+ },
+ parameters);
}
@Override
public Result remove(DocumentId id) {
- return remove(id, DocumentProtocol.Priority.NORMAL_3);
+ return remove(id, parameters());
}
@Override
- public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
+ public Result remove(DocumentId id, DocumentOperationParameters parameters) {
return send(req -> {
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- return new RemoveResponse(req, true);
- }
- else {
- return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ if (syncSession.remove(new DocumentRemove(id), parameters)) {
+ return new RemoveResponse(req, true);
+ }
+ else {
+ return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ },
+ parameters);
}
@Override
public Result update(DocumentUpdate update) {
- return update(update, DocumentProtocol.Priority.NORMAL_3);
+ return update(update, parameters());
}
@Override
- public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
+ public Result update(DocumentUpdate update, DocumentOperationParameters parameters) {
return send(req -> {
- if (syncSession.update(update, pri)) {
- return new UpdateResponse(req, true);
- }
- else {
- return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ if (syncSession.update(update, parameters)) {
+ return new UpdateResponse(req, true);
+ }
+ else {
+ return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ },
+ parameters);
}
@Override
@@ -162,21 +160,22 @@ public class LocalAsyncSession implements AsyncSession {
}
}
- private Result send(Function<Long, Response> responses) {
+ private Result send(Function<Long, Response> responses, DocumentOperationParameters parameters) {
Result.ResultType resultType = result.get();
if (resultType != SUCCESS)
return new Result(resultType, new Error());
+ ResponseHandler responseHandler = parameters.responseHandler().orElse(this::addResponse);
long req = requestId.incrementAndGet();
Phaser synchronizer = phaser.get();
if (synchronizer == null)
- addResponse(responses.apply(req));
+ responseHandler.handleResponse(responses.apply(req));
else {
synchronizer.register();
executor.execute(() -> {
try {
synchronizer.arriveAndAwaitAdvance();
- addResponse(responses.apply(req));
+ responseHandler.handleResponse(responses.apply(req));
}
finally {
synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
index 7a71089c180..0a4ab66aea4 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
@@ -184,7 +184,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
Result send(Message msg, DocumentOperationParameters parameters) {
try {
long reqId = requestId.incrementAndGet();
- msg.setContext(reqId);
+ msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null)));
msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel));
// Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew!
String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route);
@@ -198,6 +198,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
}
}
+ private static class OperationContext {
+ private final long reqId;
+ private final ResponseHandler responseHandler;
+ private OperationContext(long reqId, ResponseHandler responseHandler) {
+ this.reqId = reqId;
+ this.responseHandler = responseHandler;
+ }
+ }
+
/**
* A convenience method for assigning the internal trace level and route string to a message before sending it
* through the internal mbus session object.
@@ -206,7 +215,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
* @return the document api result object.
*/
public Result send(Message msg) {
- return send(msg, null);
+ return send(msg, parameters());
}
@Override
@@ -285,11 +294,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
new Error(mbusResult.getError().getMessage() + " (" + mbusResult.getError().getCode() + ")"));
}
- private static Response toResponse(Reply reply) {
- long reqId = (Long) reply.getContext();
- return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId);
- }
-
private static Response toError(Reply reply, long reqId) {
boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound()
|| reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound();
@@ -346,8 +350,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
if (reply.getTrace().getLevel() > 0) {
log.log(Level.INFO, reply.getTrace().toString());
}
- Response response = toResponse(reply);
- if (handler != null) {
+ OperationContext context = (OperationContext) reply.getContext();
+ long reqId = context.reqId;
+ Response response = reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId);
+ ResponseHandler operationSpecificResponseHandler = context.responseHandler;
+ if (operationSpecificResponseHandler != null)
+ operationSpecificResponseHandler.handleResponse(response);
+ else if (handler != null) {
handler.handleResponse(response);
} else {
queue.add(response);