summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java')
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java332
1 files changed, 82 insertions, 250 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index 40cb7fb9ee9..0a4a4a54a68 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -2,40 +2,41 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.DoubleValue;
+import com.yahoo.jrt.Int32Array;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int64Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
-import com.yahoo.messagebus.*;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
-import com.yahoo.text.Utf8String;
/**
* Implements the request adapter for method "mbus.send1".
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+public class RPCSendV1 extends RPCSend {
private final String METHOD_NAME = "mbus.send1";
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
@Override
- public void attach(RPCNetwork net) {
- this.net = net;
- String prefix = net.getIdentity().getServicePrefix();
- if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
- }
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
method.methodDesc("Send a message bus request and get a reply back.");
@@ -56,206 +57,80 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
.returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.")
.returnDesc(6, "payload", "The protocol specific reply payload.")
.returnDesc(7, "trace", "A string representation of the trace.");
- net.getSupervisor().addMethod(method);
+ return method;
}
-
@Override
- public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
- RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
- Message msg = recipient.getMessage();
- Route route = new Route(recipient.getRoute());
- Hop hop = route.removeHop(0);
-
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel) {
Request req = new Request(METHOD_NAME);
- req.parameters().add(new StringValue(version.toString()));
- req.parameters().add(new StringValue(route.toString()));
- req.parameters().add(new StringValue(address.getSessionName()));
- req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
- req.parameters().add(new Int32Value(msg.getRetry()));
- req.parameters().add(new Int64Value(timeRemaining));
- req.parameters().add(new StringValue(msg.getProtocol()));
- req.parameters().add(new DataValue(payload));
- req.parameters().add(new Int32Value(ctx.trace.getLevel()));
-
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Sending message (version " + version + ") from " + clientIdent + " to '" +
- address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getJRTTarget().invokeVoid(req);
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Not waiting for a reply from '" + address.getServiceName() + "'.");
- }
- Reply reply = new EmptyReply();
- reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
- } else {
- req.setContext(ctx);
- address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
- }
- req.discardParameters(); // allow garbage collection of request parameters
+ Values v = req.parameters();
+ v.add(new StringValue(version.toString()));
+ v.add(new StringValue(route.toString()));
+ v.add(new StringValue(address.getSessionName()));
+ v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
+ v.add(new Int32Value(msg.getRetry()));
+ v.add(new Int64Value(timeRemaining));
+ v.add(new StringValue(msg.getProtocol()));
+ v.add(new DataValue(payload));
+ v.add(new Int32Value(traceLevel));
+ return req;
}
@Override
- public void handleRequestDone(Request req) {
- SendContext ctx = (SendContext)req.getContext();
- String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ Version version = new Version(ret.get(0).asUtf8Array());
+ double retryDelay = ret.get(1).asDouble();
+ int[] errorCodes = ret.get(2).asInt32Array();
+ String[] errorMessages = ret.get(3).asStringArray();
+ String[] errorServices = ret.get(4).asStringArray();
+ Utf8Array protocolName = ret.get(5).asUtf8Array();
+ byte[] payload = ret.get(6).asData();
+ String replyTrace = ret.get(7).asString();
+
+ // Make sure that the owner understands the protocol.
Reply reply = null;
Error error = null;
- if (!req.checkReturnTypes(METHOD_RETURN)) {
- // Map all known JRT errors to the appropriate message bus error.
- reply = new EmptyReply();
- switch (req.errorCode()) {
- case com.yahoo.jrt.ErrorCode.TIMEOUT:
- error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
- ctx.timeout + " seconds expired); " + req.errorMessage());
- break;
- case com.yahoo.jrt.ErrorCode.CONNECTION:
- error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
- break;
- default:
- error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
- }
- } else {
- // Retrieve all reply components from JRT request object.
- Version version = new Version(req.returnValues().get(0).asUtf8Array());
- double retryDelay = req.returnValues().get(1).asDouble();
- int[] errorCodes = req.returnValues().get(2).asInt32Array();
- String[] errorMessages = req.returnValues().get(3).asStringArray();
- String[] errorServices = req.returnValues().get(4).asStringArray();
- Utf8Array protocolName = req.returnValues().get(5).asUtf8Array();
- byte[] payload = req.returnValues().get(6).asData();
- String replyTrace = req.returnValues().get(7).asString();
-
- // Make sure that the owner understands the protocol.
- if (payload.length > 0) {
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol != null) {
- Routable routable = protocol.decode(version, payload);
- if (routable != null) {
- if (routable instanceof Reply) {
- reply = (Reply)routable;
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
- }
- }
- if (reply == null) {
- reply = new EmptyReply();
- }
- reply.setRetryDelay(retryDelay);
- for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
- reply.addError(new Error(errorCodes[i],
- errorMessages[i],
- errorServices[i].length() > 0 ? errorServices[i] : serviceName));
- }
- if (ctx.trace.getLevel() > 0) {
- ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace));
+ if (payload.length > 0) {
+ Object retval = decode(protocolName, version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
}
}
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ if (reply == null) {
+ reply = new EmptyReply();
}
- reply.getTrace().swap(ctx.trace);
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
- }
-
- @Override
- public void invoke(Request request) {
- request.detach();
- Version version = new Version(request.parameters().get(0).asUtf8Array());
- String route = request.parameters().get(1).asString();
- String session = request.parameters().get(2).asString();
- boolean retryEnabled = (request.parameters().get(3).asInt8() != 0);
- int retry = request.parameters().get(4).asInt32();
- long timeRemaining = request.parameters().get(5).asInt64();
- Utf8Array protocolName = request.parameters().get(6).asUtf8Array();
- byte[] payload = request.parameters().get(7).asData();
- int traceLevel = request.parameters().get(8).asInt32();
-
- request.discardParameters(); // allow garbage collection of request parameters
-
- // Make sure that the owner understands the protocol.
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol == null) {
- replyError(request, version, traceLevel,
- new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + "."));
- return;
- }
- Routable routable = protocol.decode(version, payload);
- if (routable == null) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable."));
- return;
- }
- if (routable instanceof Reply) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message."));
- return;
+ reply.setRetryDelay(retryDelay);
+ for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
+ reply.addError(new Error(errorCodes[i], errorMessages[i],
+ errorServices[i].length() > 0 ? errorServices[i] : serviceName));
}
- Message msg = (Message)routable;
- if (route != null && route.length() > 0) {
- msg.setRoute(net.getRoute(route));
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(replyTrace));
}
- msg.setContext(new ReplyContext(request, version));
- msg.pushHandler(this);
- msg.setRetryEnabled(retryEnabled);
- msg.setRetry(retry);
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(timeRemaining);
- msg.getTrace().setLevel(traceLevel);
- if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'.");
- }
- net.getOwner().deliverMessage(msg, session);
+ return reply;
}
- @Override
- public void handleReply(Reply reply) {
- ReplyContext ctx = (ReplyContext)reply.getContext();
- reply.setContext(null);
-
- // Add trace information.
- if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
- }
+ protected Params toParams(Values args) {
+ Params p = new Params();
+ p.version = new Version(args.get(0).asUtf8Array());
+ p.route = args.get(1).asString();
+ p.session = args.get(2).asString();
+ p.retryEnabled = (args.get(3).asInt8() != 0);
+ p.retry = args.get(4).asInt32();
+ p.timeRemaining = args.get(5).asInt64();
+ p.protocolName = args.get(6).asUtf8Array();
+ p.payload = args.get(7).asData();
+ p.traceLevel = args.get(8).asInt32();
+ return p;
+ }
- // Encode and return the reply through the RPC request.
- byte[] payload = new byte[0];
- if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
- }
- if (payload == null || payload.length == 0) {
- reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
- }
- }
+ @Override
+ protected void createReponse(Values ret, Reply reply, Version version, byte [] payload) {
int[] eCodes = new int[reply.getNumErrors()];
String[] eMessages = new String[reply.getNumErrors()];
String[] eServices = new String[reply.getNumErrors()];
@@ -265,57 +140,14 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
eMessages[i] = error.getMessage();
eServices[i] = error.getService() != null ? error.getService() : "";
}
- ctx.request.returnValues().add(new StringValue(ctx.version.toString()));
- ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay()));
- ctx.request.returnValues().add(new Int32Array(eCodes));
- ctx.request.returnValues().add(new StringArray(eMessages));
- ctx.request.returnValues().add(new StringArray(eServices));
- ctx.request.returnValues().add(new StringValue(reply.getProtocol()));
- ctx.request.returnValues().add(new DataValue(payload));
- ctx.request.returnValues().add(new StringValue(
- reply.getTrace().getRoot() != null ?
- reply.getTrace().getRoot().encode() :
- ""));
- ctx.request.returnRequest();
+ ret.add(new StringValue(version.toString()));
+ ret.add(new DoubleValue(reply.getRetryDelay()));
+ ret.add(new Int32Array(eCodes));
+ ret.add(new StringArray(eMessages));
+ ret.add(new StringArray(eServices));
+ ret.add(new StringValue(reply.getProtocol()));
+ ret.add(new DataValue(payload));
+ ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : ""));
}
- /**
- * Send an error reply for a given request.
- *
- * @param request The JRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- private void replyError(Request request, Version version, int traceLevel, Error err) {
- Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
- reply.getTrace().setLevel(traceLevel);
- reply.addError(err);
- handleReply(reply);
- }
-
- private static class SendContext {
-
- final RoutingNode recipient;
- final Trace trace;
- final double timeout;
-
- SendContext(RoutingNode recipient, long timeRemaining) {
- this.recipient = recipient;
- trace = new Trace(recipient.getTrace().getLevel());
- timeout = timeRemaining * 0.001;
- }
- }
-
- private static class ReplyContext {
-
- final Request request;
- final Version version;
-
- public ReplyContext(Request request, Version version) {
- this.request = request;
- this.version = version;
- }
- }
}