summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-08-12 13:13:31 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-08-16 21:01:03 +0200
commit7c5cedef9c920cd68f01039380fd3801a08732f2 (patch)
tree2f59abd5bcef48c8334da4cb4aab7d7f696d7445 /messagebus
parent704e93a21ebde354417a9055af327d33c11a8fdb (diff)
Set owner for send-reply and protocol for reply encoding earlier
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java31
1 files changed, 18 insertions, 13 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
index c09177a3613..3f971a39d9b 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -18,6 +18,7 @@ import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Routable;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.TraceLevel;
+import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingNode;
@@ -55,7 +56,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
@Override
public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
+ SendContext ctx = new SendContext(net.getOwner(), recipient, timeRemaining);
RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
Message msg = recipient.getMessage();
Route route = new Route(recipient.getRoute());
@@ -144,7 +145,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
+ ctx.owner.deliverReply(reply, ctx.recipient);
}
protected final class Params {
@@ -173,20 +174,20 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
// Make sure that the owner understands the protocol.
Protocol protocol = net.getOwner().getProtocol(p.protocolName);
if (protocol == null) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.UNKNOWN_PROTOCOL,
"Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
return;
}
Routable routable = protocol.decode(p.version, p.payload);
if (routable == null) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.DECODE_ERROR,
"Protocol '" + protocol.getName() + "' failed to decode routable."));
return;
}
if (routable instanceof Reply) {
- replyError(request, p.version, p.traceLevel,
+ replyError(request, p.version, protocol, p.traceLevel,
new Error(ErrorCode.DECODE_ERROR,
"Payload decoded to a reply when expecting a message."));
return;
@@ -195,7 +196,7 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
if (p.route != null && p.route.length() > 0) {
msg.setRoute(net.getRoute(p.route));
}
- msg.setContext(new ReplyContext(request, p.version));
+ msg.setContext(new ReplyContext(request, p.version, protocol));
msg.pushHandler(this);
msg.setRetryEnabled(p.retryEnabled);
msg.setRetry(p.retry);
@@ -223,9 +224,8 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
// Encode and return the reply through the RPC request.
byte[] payload = new byte[0];
if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
+ if (ctx.protocol != null) {
+ payload = ctx.protocol.encode(ctx.version, reply);
}
if (payload == null || payload.length == 0) {
reply.addError(new Error(ErrorCode.ENCODE_ERROR,
@@ -242,11 +242,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
* @param request The JRT request to reply to.
* @param version The version to serialize for.
* @param traceLevel The trace level to set in the reply.
+ * @param protocol The message protocol to serialize with.
* @param err The error to reply with.
*/
- private void replyError(Request request, Version version, int traceLevel, Error err) {
+ private void replyError(Request request, Version version, Protocol protocol, int traceLevel, Error err) {
Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
+ reply.setContext(new ReplyContext(request, version, protocol));
reply.getTrace().setLevel(traceLevel);
reply.addError(err);
handleReply(reply);
@@ -254,11 +255,13 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
private static class SendContext {
+ final NetworkOwner owner;
final RoutingNode recipient;
final Trace trace;
final double timeout;
- SendContext(RoutingNode recipient, long timeRemaining) {
+ SendContext(NetworkOwner owner, RoutingNode recipient, long timeRemaining) {
+ this.owner = owner;
this.recipient = recipient;
trace = new Trace(recipient.getTrace().getLevel());
timeout = timeRemaining * 0.001;
@@ -269,10 +272,12 @@ public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWai
final Request request;
final Version version;
+ final Protocol protocol;
- ReplyContext(Request request, Version version) {
+ ReplyContext(Request request, Version version, Protocol protocol) {
this.request = request;
this.version = version;
+ this.protocol = protocol;
}
}
}