diff options
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java')
-rw-r--r-- | documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index ab39e4c30ff..7471d285db1 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -41,6 +41,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; +import static com.yahoo.documentapi.Response.Outcome.CONDITION_FAILED; +import static com.yahoo.documentapi.Response.Outcome.ERROR; +import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND; + /** * An access session which wraps a messagebus source session sending document messages. * The sessions are multithread safe. @@ -92,12 +96,12 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public Result put(Document document) { - return put(document, DocumentProtocol.Priority.NORMAL_3); + return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3); } @Override - public Result put(Document document, DocumentProtocol.Priority pri) { - PutDocumentMessage msg = new PutDocumentMessage(new DocumentPut(document)); + public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { + PutDocumentMessage msg = new PutDocumentMessage(documentPut); msg.setPriority(pri); return send(msg); } @@ -238,7 +242,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { private static Result.ResultType messageBusErrorToResultType(int messageBusError) { switch (messageBusError) { case ErrorCode.SEND_QUEUE_FULL: return Result.ResultType.TRANSIENT_ERROR; - case DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED: return Result.ResultType.CONDITION_NOT_MET_ERROR; default: return Result.ResultType.FATAL_ERROR; } } @@ -258,19 +261,23 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } private static Response toError(Reply reply, long reqId) { + boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound() + || reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound(); + boolean conditionFailed = reply.getErrorCodes().contains(DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED); + Response.Outcome outcome = definitelyNotFound ? NOT_FOUND : conditionFailed ? CONDITION_FAILED : ERROR; Message msg = reply.getMessage(); String err = getErrorMessage(reply); switch (msg.getType()) { case DocumentProtocol.MESSAGE_PUTDOCUMENT: - return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, false); + return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome); case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: - return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, false); + return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome); case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: - return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, false); + return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome); case DocumentProtocol.MESSAGE_GETDOCUMENT: - return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, false); + return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome); default: - return new Response(reqId, err, false); + return new Response(reqId, err, outcome); } } |