diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-08-12 13:13:31 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-08-16 21:01:03 +0200 |
commit | 7c5cedef9c920cd68f01039380fd3801a08732f2 (patch) | |
tree | 2f59abd5bcef48c8334da4cb4aab7d7f696d7445 /messagebus | |
parent | 704e93a21ebde354417a9055af327d33c11a8fdb (diff) |
Set owner for send-reply and protocol for reply encoding earlier
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java | 31 |
1 files changed, 18 insertions, 13 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java index c09177a3613..3f971a39d9b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java @@ -18,6 +18,7 @@ import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.Routable; import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.TraceLevel; +import com.yahoo.messagebus.network.NetworkOwner; import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; import com.yahoo.messagebus.routing.RoutingNode; @@ -55,7 +56,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai @Override public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) { - SendContext ctx = new SendContext(recipient, timeRemaining); + SendContext ctx = new SendContext(net.getOwner(), recipient, timeRemaining); RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress(); Message msg = recipient.getMessage(); Route route = new Route(recipient.getRoute()); @@ -144,7 +145,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai if (error != null) { reply.addError(error); } - net.getOwner().deliverReply(reply, ctx.recipient); + ctx.owner.deliverReply(reply, ctx.recipient); } protected final class Params { @@ -173,20 +174,20 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai // Make sure that the owner understands the protocol. Protocol protocol = net.getOwner().getProtocol(p.protocolName); if (protocol == null) { - replyError(request, p.version, p.traceLevel, + replyError(request, p.version, protocol, p.traceLevel, new Error(ErrorCode.UNKNOWN_PROTOCOL, "Protocol '" + p.protocolName + "' is not known by " + serverIdent + ".")); return; } Routable routable = protocol.decode(p.version, p.payload); if (routable == null) { - replyError(request, p.version, p.traceLevel, + replyError(request, p.version, protocol, p.traceLevel, new Error(ErrorCode.DECODE_ERROR, "Protocol '" + protocol.getName() + "' failed to decode routable.")); return; } if (routable instanceof Reply) { - replyError(request, p.version, p.traceLevel, + replyError(request, p.version, protocol, p.traceLevel, new Error(ErrorCode.DECODE_ERROR, "Payload decoded to a reply when expecting a message.")); return; @@ -195,7 +196,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai if (p.route != null && p.route.length() > 0) { msg.setRoute(net.getRoute(p.route)); } - msg.setContext(new ReplyContext(request, p.version)); + msg.setContext(new ReplyContext(request, p.version, protocol)); msg.pushHandler(this); msg.setRetryEnabled(p.retryEnabled); msg.setRetry(p.retry); @@ -223,9 +224,8 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai // Encode and return the reply through the RPC request. byte[] payload = new byte[0]; if (reply.getType() != 0) { - Protocol protocol = net.getOwner().getProtocol(reply.getProtocol()); - if (protocol != null) { - payload = protocol.encode(ctx.version, reply); + if (ctx.protocol != null) { + payload = ctx.protocol.encode(ctx.version, reply); } if (payload == null || payload.length == 0) { reply.addError(new Error(ErrorCode.ENCODE_ERROR, @@ -242,11 +242,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai * @param request The JRT request to reply to. * @param version The version to serialize for. * @param traceLevel The trace level to set in the reply. + * @param protocol The message protocol to serialize with. * @param err The error to reply with. */ - private void replyError(Request request, Version version, int traceLevel, Error err) { + private void replyError(Request request, Version version, Protocol protocol, int traceLevel, Error err) { Reply reply = new EmptyReply(); - reply.setContext(new ReplyContext(request, version)); + reply.setContext(new ReplyContext(request, version, protocol)); reply.getTrace().setLevel(traceLevel); reply.addError(err); handleReply(reply); @@ -254,11 +255,13 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai private static class SendContext { + final NetworkOwner owner; final RoutingNode recipient; final Trace trace; final double timeout; - SendContext(RoutingNode recipient, long timeRemaining) { + SendContext(NetworkOwner owner, RoutingNode recipient, long timeRemaining) { + this.owner = owner; this.recipient = recipient; trace = new Trace(recipient.getTrace().getLevel()); timeout = timeRemaining * 0.001; @@ -269,10 +272,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai final Request request; final Version version; + final Protocol protocol; - ReplyContext(Request request, Version version) { + ReplyContext(Request request, Version version, Protocol protocol) { this.request = request; this.version = version; + this.protocol = protocol; } } } |