diff options
Diffstat (limited to 'messagebus')
3 files changed, 43 insertions, 25 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index a4b5422d502..1f5d60a630a 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler { } /** - * Convenience method for acknowledging a message for its sender. + * Conveniece method for acknowledging a message back to the sender. * * This is equivalent to: * <pre> @@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler { * Messages should be acknowledged when * <ul> * <li>this destination has safely and permanently applied the message, or - * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message. + * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message * </ul> * * @param msg The message to acknowledge back to the sender. @@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler { } /** - * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it - * used to reach the destination. + * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it + * used to reach the detination. * * @param reply The reply, created from the message this is a reply to. */ diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java index 475dea3d7ac..4aa85c1ea87 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java @@ -78,9 +78,9 @@ public final class Error { @Override public String toString() { String name = ErrorCode.getName(code); - return "[" + - name + " @ " + - (service != null ? service : "localhost") + - "]: " + message; + return "[" + + (name != null ? name : code) + " @ " + + (service != null ? service : "localhost") + + "]: " + message; } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index f3c6422fbfd..88a6d5376b1 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -97,25 +97,36 @@ public class LocalNetwork implements Network { msg.setRetryEnabled(envelope.msg.getRetryEnabled()); msg.setRetry(envelope.msg.getRetry()); msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); - msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); - owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); + msg.pushHandler(new ReplyHandler() { + + @Override + public void handleReply(Reply reply) { + new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); + } + }); + owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress()) + .getSessionName()); } }); } private void receiveLater(ReplyEnvelope envelope) { byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); - executor.execute(() -> { - Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); - reply.setRetryDelay(envelope.reply.getRetryDelay()); - reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); - for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - Error error = envelope.reply.getError(i); - reply.addError(new Error(error.getCode(), - error.getMessage(), - error.getService() != null ? error.getService() : envelope.sender.hostId)); + executor.execute(new Runnable() { + + @Override + public void run() { + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + reply.setRetryDelay(envelope.reply.getRetryDelay()); + reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); + for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { + Error error = envelope.reply.getError(i); + reply.addError(new Error(error.getCode(), + error.getMessage(), + error.getService() != null ? error.getService() : envelope.sender.hostId)); + } + owner.deliverReply(reply, envelope.parent.recipient); } - owner.deliverReply(reply, envelope.parent.recipient); }); } @@ -126,16 +137,23 @@ public class LocalNetwork implements Network { return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode); } + @SuppressWarnings("unchecked") private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { - return clazz.cast(toDecode.length == 0 ? new EmptyReply() - : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); + if (toDecode.length == 0) { + return clazz.cast(new EmptyReply()); + } + return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); } @Override - public void sync() { } + public void sync() { + + } @Override - public void shutdown() { } + public void shutdown() { + + } @Override public String getConnectionSpec() { @@ -160,7 +178,7 @@ public class LocalNetwork implements Network { } void send() { - ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this); + LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); } } |