aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/main
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-19 19:34:23 +0200
committerGitHub <noreply@github.com>2017-09-19 19:34:23 +0200
commitff6ea0bf5b34b906d3008bd1ca6560967c91f561 (patch)
tree5af4cbac9f0c5597d996758f626a2e1cda96193b /messagebus/src/main
parentb853bd3dbe033da6b3fa141f8a84b38f45c78e76 (diff)
parent1f8bac86043733a0d875a0c1ab1f028588972aeb (diff)
Merge pull request #3238 from vespa-engine/balder/mbus-transported-with-slime-2
- Use C++11 for loops.
Diffstat (limited to 'messagebus/src/main')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java54
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java269
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java332
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java209
4 files changed, 594 insertions, 270 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 096c0c0b485..d42e396452a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,17 +2,28 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.VersionSpecification;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Task;
+import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
@@ -22,8 +33,16 @@ import com.yahoo.messagebus.routing.RoutingNode;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -44,7 +63,7 @@ public class RPCNetwork implements Network, MethodHandler {
private final Acceptor listener;
private final Mirror mirror;
private final Register register;
- private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>();
+ private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>();
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
@@ -162,9 +181,10 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter = new RPCSendV1();
- addSendAdapter(new VersionSpecification(5), adapter);
- addSendAdapter(new VersionSpecification(6), adapter);
+ RPCSendAdapter adapter1 = new RPCSendV1();
+ RPCSendAdapter adapter2 = new RPCSendV2();
+ addSendAdapter(new Version(5), adapter1);
+ addSendAdapter(new Version(6,142), adapter2);
}
@Override
@@ -234,11 +254,9 @@ public class RPCNetwork implements Network, MethodHandler {
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
- replyError(ctx, ErrorCode.NETWORK_SHUTDOWN,
- "Network layer has performed shutdown.");
+ replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -315,7 +333,7 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to register an adapter.
* @param adapter The adapter to register.
*/
- private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) {
+ private void addSendAdapter(Version version, RPCSendAdapter adapter) {
adapter.attach(this);
sendAdapters.put(version, adapter);
}
@@ -328,12 +346,8 @@ public class RPCNetwork implements Network, MethodHandler {
* @return The compatible adapter.
*/
private RPCSendAdapter getSendAdapter(Version version) {
- for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) {
- if (entry.getKey().matches(version)) {
- return entry.getValue();
- }
- }
- return null;
+ Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
+ return (lower != null) ? lower.getValue() : null;
}
/**
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
new file mode 100644
index 00000000000..d7b4887bd36
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -0,0 +1,269 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceLevel;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.send1/mbus.slime".
+ *
+ * @author baldersheim
+ */
+public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+
+ private RPCNetwork net = null;
+ private String clientIdent = "client";
+ private String serverIdent = "server";
+
+ protected abstract Method buildMethod();
+ protected abstract String getReturnSpec();
+ protected abstract Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel);
+ protected abstract Reply createReply(Values ret, String serviceName, Trace trace);
+ protected abstract Params toParams(Values req);
+ protected abstract void createResponse(Values ret, Reply reply, Version version, byte [] payload);
+ @Override
+ public final void attach(RPCNetwork net) {
+ this.net = net;
+ String prefix = net.getIdentity().getServicePrefix();
+ if (prefix != null && prefix.length() > 0) {
+ clientIdent = "'" + prefix + "'";
+ serverIdent = clientIdent;
+ }
+ net.getSupervisor().addMethod(buildMethod());
+ }
+
+ @Override
+ public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
+ SendContext ctx = new SendContext(recipient, timeRemaining);
+ RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
+ Message msg = recipient.getMessage();
+ Route route = new Route(recipient.getRoute());
+ Hop hop = route.removeHop(0);
+
+ Request req = encodeRequest(version, route, address,msg, timeRemaining, payload, ctx.trace.getLevel());
+
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Sending message (version " + version + ") from " + clientIdent + " to '" +
+ address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getJRTTarget().invokeVoid(req);
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Not waiting for a reply from '" + address.getServiceName() + "'.");
+ }
+ Reply reply = new EmptyReply();
+ reply.getTrace().swap(ctx.trace);
+ net.getOwner().deliverReply(reply, recipient);
+ } else {
+ req.setContext(ctx);
+ address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
+ }
+ req.discardParameters(); // allow garbage collection of request parameters
+ }
+
+ protected final Object decode(Utf8Array protocolName, Version version, byte [] payload) {
+ Protocol protocol = net.getOwner().getProtocol(protocolName);
+ if (protocol != null) {
+ Routable routable = protocol.decode(version, payload);
+ if (routable != null) {
+ if (routable instanceof Reply) {
+ return routable;
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message.");
+ }
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable.");
+ }
+ } else {
+ return new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
+ }
+ }
+
+ @Override
+ public final void handleRequestDone(Request req) {
+ SendContext ctx = (SendContext)req.getContext();
+ String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ Reply reply = null;
+ Error error = null;
+ if (!req.checkReturnTypes(getReturnSpec())) {
+ // Map all known JRT errors to the appropriate message bus error.
+ reply = new EmptyReply();
+ switch (req.errorCode()) {
+ case com.yahoo.jrt.ErrorCode.TIMEOUT:
+ error = new Error(ErrorCode.TIMEOUT,
+ "A timeout occured while waiting for '" + serviceName + "' (" +
+ ctx.timeout + " seconds expired); " + req.errorMessage());
+ break;
+ case com.yahoo.jrt.ErrorCode.CONNECTION:
+ error = new Error(ErrorCode.CONNECTION_ERROR,
+ "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
+ break;
+ default:
+ error = new Error(ErrorCode.NETWORK_ERROR,
+ "A network error occured for '" + serviceName + "'; " + req.errorMessage());
+ }
+ } else {
+ reply = createReply(req.returnValues(), serviceName, ctx.trace);
+ }
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ }
+ reply.getTrace().swap(ctx.trace);
+ if (error != null) {
+ reply.addError(error);
+ }
+ net.getOwner().deliverReply(reply, ctx.recipient);
+ }
+
+ protected final class Params {
+ Version version;
+ String route;
+ String session;
+ boolean retryEnabled;
+ int retry;
+ long timeRemaining;
+ Utf8Array protocolName;
+ byte [] payload;
+ int traceLevel;
+ }
+
+ @Override
+ public final void invoke(Request request) {
+ request.detach();
+ Params p = toParams(request.parameters());
+
+ request.discardParameters(); // allow garbage collection of request parameters
+
+ // Make sure that the owner understands the protocol.
+ Protocol protocol = net.getOwner().getProtocol(p.protocolName);
+ if (protocol == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
+ return;
+ }
+ Routable routable = protocol.decode(p.version, p.payload);
+ if (routable == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable."));
+ return;
+ }
+ if (routable instanceof Reply) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message."));
+ return;
+ }
+ Message msg = (Message)routable;
+ if (p.route != null && p.route.length() > 0) {
+ msg.setRoute(net.getRoute(p.route));
+ }
+ msg.setContext(new ReplyContext(request, p.version));
+ msg.pushHandler(this);
+ msg.setRetryEnabled(p.retryEnabled);
+ msg.setRetry(p.retry);
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(p.timeRemaining);
+ msg.getTrace().setLevel(p.traceLevel);
+ if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + p.session + "'.");
+ }
+ net.getOwner().deliverMessage(msg, p.session);
+ }
+
+ @Override
+ public final void handleReply(Reply reply) {
+ ReplyContext ctx = (ReplyContext)reply.getContext();
+ reply.setContext(null);
+
+ // Add trace information.
+ if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
+ }
+
+ // Encode and return the reply through the RPC request.
+ byte[] payload = new byte[0];
+ if (reply.getType() != 0) {
+ Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
+ if (protocol != null) {
+ payload = protocol.encode(ctx.version, reply);
+ }
+ if (payload == null || payload.length == 0) {
+ reply.addError(new Error(ErrorCode.ENCODE_ERROR,
+ "An error occured while encoding the reply."));
+ }
+ }
+ createResponse(ctx.request.returnValues(), reply, ctx.version, payload);
+ ctx.request.returnRequest();
+ }
+
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The JRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ private void replyError(Request request, Version version, int traceLevel, Error err) {
+ Reply reply = new EmptyReply();
+ reply.setContext(new ReplyContext(request, version));
+ reply.getTrace().setLevel(traceLevel);
+ reply.addError(err);
+ handleReply(reply);
+ }
+
+ private static class SendContext {
+
+ final RoutingNode recipient;
+ final Trace trace;
+ final double timeout;
+
+ SendContext(RoutingNode recipient, long timeRemaining) {
+ this.recipient = recipient;
+ trace = new Trace(recipient.getTrace().getLevel());
+ timeout = timeRemaining * 0.001;
+ }
+ }
+
+ private static class ReplyContext {
+
+ final Request request;
+ final Version version;
+
+ public ReplyContext(Request request, Version version) {
+ this.request = request;
+ this.version = version;
+ }
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index 40cb7fb9ee9..480a716e382 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -2,40 +2,41 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.DoubleValue;
+import com.yahoo.jrt.Int32Array;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int64Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
-import com.yahoo.messagebus.*;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
-import com.yahoo.text.Utf8String;
/**
* Implements the request adapter for method "mbus.send1".
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+public class RPCSendV1 extends RPCSend {
private final String METHOD_NAME = "mbus.send1";
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
@Override
- public void attach(RPCNetwork net) {
- this.net = net;
- String prefix = net.getIdentity().getServicePrefix();
- if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
- }
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
method.methodDesc("Send a message bus request and get a reply back.");
@@ -56,206 +57,80 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
.returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.")
.returnDesc(6, "payload", "The protocol specific reply payload.")
.returnDesc(7, "trace", "A string representation of the trace.");
- net.getSupervisor().addMethod(method);
+ return method;
}
-
@Override
- public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
- RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
- Message msg = recipient.getMessage();
- Route route = new Route(recipient.getRoute());
- Hop hop = route.removeHop(0);
-
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel) {
Request req = new Request(METHOD_NAME);
- req.parameters().add(new StringValue(version.toString()));
- req.parameters().add(new StringValue(route.toString()));
- req.parameters().add(new StringValue(address.getSessionName()));
- req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
- req.parameters().add(new Int32Value(msg.getRetry()));
- req.parameters().add(new Int64Value(timeRemaining));
- req.parameters().add(new StringValue(msg.getProtocol()));
- req.parameters().add(new DataValue(payload));
- req.parameters().add(new Int32Value(ctx.trace.getLevel()));
-
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Sending message (version " + version + ") from " + clientIdent + " to '" +
- address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getJRTTarget().invokeVoid(req);
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Not waiting for a reply from '" + address.getServiceName() + "'.");
- }
- Reply reply = new EmptyReply();
- reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
- } else {
- req.setContext(ctx);
- address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
- }
- req.discardParameters(); // allow garbage collection of request parameters
+ Values v = req.parameters();
+ v.add(new StringValue(version.toString()));
+ v.add(new StringValue(route.toString()));
+ v.add(new StringValue(address.getSessionName()));
+ v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
+ v.add(new Int32Value(msg.getRetry()));
+ v.add(new Int64Value(timeRemaining));
+ v.add(new StringValue(msg.getProtocol()));
+ v.add(new DataValue(payload));
+ v.add(new Int32Value(traceLevel));
+ return req;
}
@Override
- public void handleRequestDone(Request req) {
- SendContext ctx = (SendContext)req.getContext();
- String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ Version version = new Version(ret.get(0).asUtf8Array());
+ double retryDelay = ret.get(1).asDouble();
+ int[] errorCodes = ret.get(2).asInt32Array();
+ String[] errorMessages = ret.get(3).asStringArray();
+ String[] errorServices = ret.get(4).asStringArray();
+ Utf8Array protocolName = ret.get(5).asUtf8Array();
+ byte[] payload = ret.get(6).asData();
+ String replyTrace = ret.get(7).asString();
+
+ // Make sure that the owner understands the protocol.
Reply reply = null;
Error error = null;
- if (!req.checkReturnTypes(METHOD_RETURN)) {
- // Map all known JRT errors to the appropriate message bus error.
- reply = new EmptyReply();
- switch (req.errorCode()) {
- case com.yahoo.jrt.ErrorCode.TIMEOUT:
- error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
- ctx.timeout + " seconds expired); " + req.errorMessage());
- break;
- case com.yahoo.jrt.ErrorCode.CONNECTION:
- error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
- break;
- default:
- error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
- }
- } else {
- // Retrieve all reply components from JRT request object.
- Version version = new Version(req.returnValues().get(0).asUtf8Array());
- double retryDelay = req.returnValues().get(1).asDouble();
- int[] errorCodes = req.returnValues().get(2).asInt32Array();
- String[] errorMessages = req.returnValues().get(3).asStringArray();
- String[] errorServices = req.returnValues().get(4).asStringArray();
- Utf8Array protocolName = req.returnValues().get(5).asUtf8Array();
- byte[] payload = req.returnValues().get(6).asData();
- String replyTrace = req.returnValues().get(7).asString();
-
- // Make sure that the owner understands the protocol.
- if (payload.length > 0) {
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol != null) {
- Routable routable = protocol.decode(version, payload);
- if (routable != null) {
- if (routable instanceof Reply) {
- reply = (Reply)routable;
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
- }
- }
- if (reply == null) {
- reply = new EmptyReply();
- }
- reply.setRetryDelay(retryDelay);
- for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
- reply.addError(new Error(errorCodes[i],
- errorMessages[i],
- errorServices[i].length() > 0 ? errorServices[i] : serviceName));
- }
- if (ctx.trace.getLevel() > 0) {
- ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace));
+ if (payload.length > 0) {
+ Object retval = decode(protocolName, version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
}
}
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ if (reply == null) {
+ reply = new EmptyReply();
}
- reply.getTrace().swap(ctx.trace);
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
- }
-
- @Override
- public void invoke(Request request) {
- request.detach();
- Version version = new Version(request.parameters().get(0).asUtf8Array());
- String route = request.parameters().get(1).asString();
- String session = request.parameters().get(2).asString();
- boolean retryEnabled = (request.parameters().get(3).asInt8() != 0);
- int retry = request.parameters().get(4).asInt32();
- long timeRemaining = request.parameters().get(5).asInt64();
- Utf8Array protocolName = request.parameters().get(6).asUtf8Array();
- byte[] payload = request.parameters().get(7).asData();
- int traceLevel = request.parameters().get(8).asInt32();
-
- request.discardParameters(); // allow garbage collection of request parameters
-
- // Make sure that the owner understands the protocol.
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol == null) {
- replyError(request, version, traceLevel,
- new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + "."));
- return;
- }
- Routable routable = protocol.decode(version, payload);
- if (routable == null) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable."));
- return;
- }
- if (routable instanceof Reply) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message."));
- return;
+ reply.setRetryDelay(retryDelay);
+ for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
+ reply.addError(new Error(errorCodes[i], errorMessages[i],
+ errorServices[i].length() > 0 ? errorServices[i] : serviceName));
}
- Message msg = (Message)routable;
- if (route != null && route.length() > 0) {
- msg.setRoute(net.getRoute(route));
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(replyTrace));
}
- msg.setContext(new ReplyContext(request, version));
- msg.pushHandler(this);
- msg.setRetryEnabled(retryEnabled);
- msg.setRetry(retry);
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(timeRemaining);
- msg.getTrace().setLevel(traceLevel);
- if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'.");
- }
- net.getOwner().deliverMessage(msg, session);
+ return reply;
}
- @Override
- public void handleReply(Reply reply) {
- ReplyContext ctx = (ReplyContext)reply.getContext();
- reply.setContext(null);
-
- // Add trace information.
- if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
- }
+ protected Params toParams(Values args) {
+ Params p = new Params();
+ p.version = new Version(args.get(0).asUtf8Array());
+ p.route = args.get(1).asString();
+ p.session = args.get(2).asString();
+ p.retryEnabled = (args.get(3).asInt8() != 0);
+ p.retry = args.get(4).asInt32();
+ p.timeRemaining = args.get(5).asInt64();
+ p.protocolName = args.get(6).asUtf8Array();
+ p.payload = args.get(7).asData();
+ p.traceLevel = args.get(8).asInt32();
+ return p;
+ }
- // Encode and return the reply through the RPC request.
- byte[] payload = new byte[0];
- if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
- }
- if (payload == null || payload.length == 0) {
- reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
- }
- }
+ @Override
+ protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
int[] eCodes = new int[reply.getNumErrors()];
String[] eMessages = new String[reply.getNumErrors()];
String[] eServices = new String[reply.getNumErrors()];
@@ -265,57 +140,14 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
eMessages[i] = error.getMessage();
eServices[i] = error.getService() != null ? error.getService() : "";
}
- ctx.request.returnValues().add(new StringValue(ctx.version.toString()));
- ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay()));
- ctx.request.returnValues().add(new Int32Array(eCodes));
- ctx.request.returnValues().add(new StringArray(eMessages));
- ctx.request.returnValues().add(new StringArray(eServices));
- ctx.request.returnValues().add(new StringValue(reply.getProtocol()));
- ctx.request.returnValues().add(new DataValue(payload));
- ctx.request.returnValues().add(new StringValue(
- reply.getTrace().getRoot() != null ?
- reply.getTrace().getRoot().encode() :
- ""));
- ctx.request.returnRequest();
+ ret.add(new StringValue(version.toString()));
+ ret.add(new DoubleValue(reply.getRetryDelay()));
+ ret.add(new Int32Array(eCodes));
+ ret.add(new StringArray(eMessages));
+ ret.add(new StringArray(eServices));
+ ret.add(new StringValue(reply.getProtocol()));
+ ret.add(new DataValue(payload));
+ ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : ""));
}
- /**
- * Send an error reply for a given request.
- *
- * @param request The JRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- private void replyError(Request request, Version version, int traceLevel, Error err) {
- Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
- reply.getTrace().setLevel(traceLevel);
- reply.addError(err);
- handleReply(reply);
- }
-
- private static class SendContext {
-
- final RoutingNode recipient;
- final Trace trace;
- final double timeout;
-
- SendContext(RoutingNode recipient, long timeRemaining) {
- this.recipient = recipient;
- trace = new Trace(recipient.getTrace().getLevel());
- timeout = timeRemaining * 0.001;
- }
- }
-
- private static class ReplyContext {
-
- final Request request;
- final Version version;
-
- public ReplyContext(Request request, Version version) {
- this.request = request;
- this.version = version;
- }
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
new file mode 100644
index 00000000000..8cc0b73ae30
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
@@ -0,0 +1,209 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.slime".
+ *
+ * @author baldersheim
+ */
+public class RPCSendV2 extends RPCSend {
+
+ private final static String METHOD_NAME = "mbus.slime";
+ private final static String METHOD_PARAMS = "bixbix";
+ private final static String METHOD_RETURN = "bixbix";
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024);
+
+ @Override
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
+
+ Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
+ method.methodDesc("Send a message bus request and get a reply back.");
+ method.paramDesc(0, "header_encoding", "Encoding type of header.")
+ .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.")
+ .paramDesc(2, "header_payload", "Slime encoded header payload.")
+ .paramDesc(3, "body_encoding", "Encoding type of body.")
+ .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.")
+ .paramDesc(5, "body_payload", "Slime encoded body payload.");
+ method.returnDesc(0, "header_encoding", "Encoding type of header.")
+ .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.")
+ .returnDesc(2, "header_payload", "Slime encoded header payload.")
+ .returnDesc(3, "body_encoding", "Encoding type of body.")
+ .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.")
+ .returnDesc(5, "body_payload", "Slime encoded body payload.");
+ return method;
+ }
+ private static final String VERSION_F = new String("version");
+ private static final String ROUTE_F = new String("route");
+ private static final String SESSION_F = new String("session");
+ private static final String PROTOCOL_F = new String("prot");
+ private static final String TRACELEVEL_F = new String("tracelevel");
+ private static final String TRACE_F = new String("trace");
+ private static final String USERETRY_F = new String("useretry");
+ private static final String RETRY_F = new String("retry");
+ private static final String RETRYDELAY_F = new String("retrydelay");
+ private static final String TIMEREMAINING_F = new String("timeleft");
+ private static final String ERRORS_F = new String("errors");
+ private static final String SERVICE_F = new String("service");
+ private static final String CODE_F = new String("code");
+ private static final String BLOB_F = new String("msg");
+ private static final String MSG_F = new String("msg");
+
+ @Override
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel)
+ {
+
+ Request req = new Request(METHOD_NAME);
+ Values v = req.parameters();
+
+ v.add(new Int8Value(CompressionType.NONE.getCode()));
+ v.add(new Int32Value(0));
+ v.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setString(PROTOCOL_F, msg.getProtocol().toString());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMEREMAINING_F, msg.getTimeRemaining());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ root.setData(BLOB_F, payload);
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ v.add(new Int8Value(compressionResult.type().getCode()));
+ v.add(new Int32Value(compressionResult.uncompressedSize()));
+ v.add(new DataValue(compressionResult.data()));
+
+ return req;
+ }
+
+ @Override
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ CompressionType compression = CompressionType.valueOf(ret.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(ret.get(5).asData(), compression, ret.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+
+ Version version = new Version(root.field(VERSION_F).asString());
+ byte[] payload = root.field(BLOB_F).asData();
+
+ // Make sure that the owner understands the protocol.
+ Reply reply = null;
+ Error error = null;
+ if (payload.length > 0) {
+ Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
+ }
+ }
+ if (reply == null) {
+ reply = new EmptyReply();
+ }
+ if (error != null) {
+ reply.addError(error);
+ }
+ reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble());
+
+ Inspector errors = root.field(ERRORS_F);
+ for (int i = 0; i < errors.entries(); i++) {
+ Inspector e = errors.entry(i);
+ String service = e.field(SERVICE_F).asString();
+ reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(),
+ (service != null && service.length() > 0) ? service : serviceName));
+ }
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString()));
+ }
+ return reply;
+ }
+
+ protected Params toParams(Values args) {
+ CompressionType compression = CompressionType.valueOf(args.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(args.get(5).asData(), compression, args.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+ Params p = new Params();
+ p.version = new Version(root.field(VERSION_F).asString());
+ p.route = root.field(ROUTE_F).asString();
+ p.session = root.field(SESSION_F).asString();
+ p.retryEnabled = root.field(USERETRY_F).asBool();
+ p.retry = (int)root.field(RETRY_F).asLong();
+ p.timeRemaining = root.field(TIMEREMAINING_F).asLong();
+ p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString()));
+ p.payload = root.field(BLOB_F).asData();
+ p.traceLevel = (int)root.field(TRACELEVEL_F).asLong();
+ return p;
+ }
+
+ @Override
+ protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
+ ret.add(new Int8Value(CompressionType.NONE.getCode()));
+ ret.add(new Int32Value(0));
+ ret.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol().toString());
+ root.setData(BLOB_F, payload);
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor array = root.setArray(ERRORS_F);
+ for (int i = 0; i < reply.getNumErrors(); i++) {
+ Cursor e = array.addObject();
+ Error mbusE = reply.getError(i);
+ e.setLong(CODE_F, mbusE.getCode());
+ e.setString(MSG_F, mbusE.getMessage());
+ if (mbusE.getService() != null) {
+ e.setString(SERVICE_F, mbusE.getService());
+ }
+ }
+ }
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ ret.add(new Int8Value(compressionResult.type().getCode()));
+ ret.add(new Int32Value(compressionResult.uncompressedSize()));
+ ret.add(new DataValue(compressionResult.data()));
+ }
+
+}