summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java25
1 files changed, 16 insertions, 9 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
index ab39e4c30ff..7471d285db1 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
@@ -41,6 +41,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
+import static com.yahoo.documentapi.Response.Outcome.CONDITION_FAILED;
+import static com.yahoo.documentapi.Response.Outcome.ERROR;
+import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND;
+
/**
* An access session which wraps a messagebus source session sending document messages.
* The sessions are multithread safe.
@@ -92,12 +96,12 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
@Override
public Result put(Document document) {
- return put(document, DocumentProtocol.Priority.NORMAL_3);
+ return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3);
}
@Override
- public Result put(Document document, DocumentProtocol.Priority pri) {
- PutDocumentMessage msg = new PutDocumentMessage(new DocumentPut(document));
+ public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
+ PutDocumentMessage msg = new PutDocumentMessage(documentPut);
msg.setPriority(pri);
return send(msg);
}
@@ -238,7 +242,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
private static Result.ResultType messageBusErrorToResultType(int messageBusError) {
switch (messageBusError) {
case ErrorCode.SEND_QUEUE_FULL: return Result.ResultType.TRANSIENT_ERROR;
- case DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED: return Result.ResultType.CONDITION_NOT_MET_ERROR;
default: return Result.ResultType.FATAL_ERROR;
}
}
@@ -258,19 +261,23 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
}
private static Response toError(Reply reply, long reqId) {
+ boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound()
+ || reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound();
+ boolean conditionFailed = reply.getErrorCodes().contains(DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED);
+ Response.Outcome outcome = definitelyNotFound ? NOT_FOUND : conditionFailed ? CONDITION_FAILED : ERROR;
Message msg = reply.getMessage();
String err = getErrorMessage(reply);
switch (msg.getType()) {
case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, false);
+ return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome);
case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, false);
+ return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome);
case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, false);
+ return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome);
case DocumentProtocol.MESSAGE_GETDOCUMENT:
- return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, false);
+ return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome);
default:
- return new Response(reqId, err, false);
+ return new Response(reqId, err, outcome);
}
}