diff options
Diffstat (limited to 'messagebus')
3 files changed, 25 insertions, 43 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java index 1f5d60a630a..a4b5422d502 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DestinationSession.java @@ -57,7 +57,7 @@ public final class DestinationSession implements MessageHandler { } /** - * Conveniece method for acknowledging a message back to the sender. + * Convenience method for acknowledging a message for its sender. * * This is equivalent to: * <pre> @@ -69,7 +69,7 @@ public final class DestinationSession implements MessageHandler { * Messages should be acknowledged when * <ul> * <li>this destination has safely and permanently applied the message, or - * <li>an intermediate determines that the purpose of the message is fullfilled without forwarding the message + * <li>an intermediate determines that the purpose of the message is fulfilled without forwarding the message. * </ul> * * @param msg The message to acknowledge back to the sender. @@ -82,8 +82,8 @@ public final class DestinationSession implements MessageHandler { } /** - * Sends a reply to a message. The reply will propagate back to the original sender, prefering the same route as it - * used to reach the detination. + * Sends a reply to a message. The reply will propagate back to the original sender, preferring the same route as it + * used to reach the destination. * * @param reply The reply, created from the message this is a reply to. */ diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Error.java b/messagebus/src/main/java/com/yahoo/messagebus/Error.java index 4aa85c1ea87..475dea3d7ac 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Error.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Error.java @@ -78,9 +78,9 @@ public final class Error { @Override public String toString() { String name = ErrorCode.getName(code); - return "[" + - (name != null ? name : code) + " @ " + - (service != null ? service : "localhost") + - "]: " + message; + return "[" + + name + " @ " + + (service != null ? service : "localhost") + + "]: " + message; } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index 88a6d5376b1..f3c6422fbfd 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -97,36 +97,25 @@ public class LocalNetwork implements Network { msg.setRetryEnabled(envelope.msg.getRetryEnabled()); msg.setRetry(envelope.msg.getRetry()); msg.setTimeRemaining(envelope.msg.getTimeRemainingNow()); - msg.pushHandler(new ReplyHandler() { - - @Override - public void handleReply(Reply reply) { - new ReplyEnvelope(LocalNetwork.this, envelope, reply).send(); - } - }); - owner.deliverMessage(msg, LocalServiceAddress.class.cast(envelope.recipient.getServiceAddress()) - .getSessionName()); + msg.pushHandler(reply -> new ReplyEnvelope(LocalNetwork.this, envelope, reply).send()); + owner.deliverMessage(msg, ((LocalServiceAddress) envelope.recipient.getServiceAddress()).getSessionName()); } }); } private void receiveLater(ReplyEnvelope envelope) { byte[] payload = envelope.sender.encode(envelope.reply.getProtocol(), envelope.reply); - executor.execute(new Runnable() { - - @Override - public void run() { - Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); - reply.setRetryDelay(envelope.reply.getRetryDelay()); - reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); - for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { - Error error = envelope.reply.getError(i); - reply.addError(new Error(error.getCode(), - error.getMessage(), - error.getService() != null ? error.getService() : envelope.sender.hostId)); - } - owner.deliverReply(reply, envelope.parent.recipient); + executor.execute(() -> { + Reply reply = decode(envelope.reply.getProtocol(), payload, Reply.class); + reply.setRetryDelay(envelope.reply.getRetryDelay()); + reply.getTrace().getRoot().addChild(TraceNode.decode(envelope.reply.getTrace().getRoot().encode())); + for (int i = 0, len = envelope.reply.getNumErrors(); i < len; ++i) { + Error error = envelope.reply.getError(i); + reply.addError(new Error(error.getCode(), + error.getMessage(), + error.getService() != null ? error.getService() : envelope.sender.hostId)); } + owner.deliverReply(reply, envelope.parent.recipient); }); } @@ -137,23 +126,16 @@ public class LocalNetwork implements Network { return owner.getProtocol(protocolName).encode(Vtag.currentVersion, toEncode); } - @SuppressWarnings("unchecked") private <T extends Routable> T decode(Utf8String protocolName, byte[] toDecode, Class<T> clazz) { - if (toDecode.length == 0) { - return clazz.cast(new EmptyReply()); - } - return clazz.cast(owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); + return clazz.cast(toDecode.length == 0 ? new EmptyReply() + : owner.getProtocol(protocolName).decode(Vtag.currentVersion, toDecode)); } @Override - public void sync() { - - } + public void sync() { } @Override - public void shutdown() { - - } + public void shutdown() { } @Override public String getConnectionSpec() { @@ -178,7 +160,7 @@ public class LocalNetwork implements Network { } void send() { - LocalServiceAddress.class.cast(recipient.getServiceAddress()).getNetwork().receiveLater(this); + ((LocalServiceAddress) recipient.getServiceAddress()).getNetwork().receiveLater(this); } } |