diff options
6 files changed, 12 insertions, 16 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java index 925968b9ade..6c1b7bc10be 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/MessageBus.java @@ -423,7 +423,6 @@ public class MessageBus implements ConfigHandler, NetworkOwner, MessageHandler, return getProtocol((Utf8Array)name); } - @Override public void deliverReply(Reply reply, ReplyHandler handler) { msn.deliverReply(reply, handler); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java index 2b55bf5b901..c4185d67251 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/NetworkOwner.java @@ -24,7 +24,7 @@ public interface NetworkOwner { * @param name The name of the protocol to return. * @return The named protocol. */ - public Protocol getProtocol(Utf8Array name); + Protocol getProtocol(Utf8Array name); /** * All messages that arrive in the network layer is passed to its owner through this function. @@ -32,13 +32,6 @@ public interface NetworkOwner { * @param message The message that just arrived from the network. * @param session The name of the session that is the recipient of the request. */ - public void deliverMessage(Message message, String session); + void deliverMessage(Message message, String session); - /** - * All replies that arrive in the network layer is passed through this to unentangle it from the network thread. - * - * @param reply The reply that just arrived from the network. - * @param handler The handler that is to receive the reply. - */ - public void deliverReply(Reply reply, ReplyHandler handler); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java index f3c6422fbfd..06b29ed524c 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/local/LocalNetwork.java @@ -115,7 +115,7 @@ public class LocalNetwork implements Network { error.getMessage(), error.getService() != null ? error.getService() : envelope.sender.hostId)); } - owner.deliverReply(reply, envelope.parent.recipient); + envelope.parent.recipient.handleReply(reply); }); } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index a7efe6e0a8a..e95883d8dda 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -348,7 +348,7 @@ public class RPCNetwork implements Network, MethodHandler { Reply reply = new EmptyReply(); reply.getTrace().setLevel(ctx.traceLevel); reply.addError(new Error(errCode, errMsg)); - owner.deliverReply(reply, recipient); + recipient.handleReply(reply); } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java index 5fc1f64866f..68eadfbf542 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java @@ -80,7 +80,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai } Reply reply = new EmptyReply(); reply.getTrace().swap(ctx.trace); - net.getOwner().deliverReply(reply, recipient); + recipient.handleReply(reply); } else { req.setContext(ctx); address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this); @@ -147,7 +147,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai if (error != null) { reply.addError(error); } - ctx.owner.deliverReply(reply, ctx.recipient); + ctx.recipient.handleReply(reply); } protected final class Params { diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java index 05fc6f62236..42d1ff9ba2d 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java @@ -805,9 +805,13 @@ public class RoutingNode implements ReplyHandler { this.serviceAddress = serviceAddress; } + /** Proxy through message bus in case it was destroyed in the meantime. */ @Override public void handleReply(Reply reply) { - setReply(reply); - notifyParent(); + mbus.deliverReply(reply, r -> { + setReply(reply); + notifyParent(); + }); } + } |