summaryrefslogtreecommitdiffstats
path: root/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java209
1 files changed, 209 insertions, 0 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
new file mode 100644
index 00000000000..8cc0b73ae30
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
@@ -0,0 +1,209 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.slime".
+ *
+ * @author baldersheim
+ */
+public class RPCSendV2 extends RPCSend {
+
+ private final static String METHOD_NAME = "mbus.slime";
+ private final static String METHOD_PARAMS = "bixbix";
+ private final static String METHOD_RETURN = "bixbix";
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024);
+
+ @Override
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
+
+ Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
+ method.methodDesc("Send a message bus request and get a reply back.");
+ method.paramDesc(0, "header_encoding", "Encoding type of header.")
+ .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.")
+ .paramDesc(2, "header_payload", "Slime encoded header payload.")
+ .paramDesc(3, "body_encoding", "Encoding type of body.")
+ .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.")
+ .paramDesc(5, "body_payload", "Slime encoded body payload.");
+ method.returnDesc(0, "header_encoding", "Encoding type of header.")
+ .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.")
+ .returnDesc(2, "header_payload", "Slime encoded header payload.")
+ .returnDesc(3, "body_encoding", "Encoding type of body.")
+ .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.")
+ .returnDesc(5, "body_payload", "Slime encoded body payload.");
+ return method;
+ }
+ private static final String VERSION_F = new String("version");
+ private static final String ROUTE_F = new String("route");
+ private static final String SESSION_F = new String("session");
+ private static final String PROTOCOL_F = new String("prot");
+ private static final String TRACELEVEL_F = new String("tracelevel");
+ private static final String TRACE_F = new String("trace");
+ private static final String USERETRY_F = new String("useretry");
+ private static final String RETRY_F = new String("retry");
+ private static final String RETRYDELAY_F = new String("retrydelay");
+ private static final String TIMEREMAINING_F = new String("timeleft");
+ private static final String ERRORS_F = new String("errors");
+ private static final String SERVICE_F = new String("service");
+ private static final String CODE_F = new String("code");
+ private static final String BLOB_F = new String("msg");
+ private static final String MSG_F = new String("msg");
+
+ @Override
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel)
+ {
+
+ Request req = new Request(METHOD_NAME);
+ Values v = req.parameters();
+
+ v.add(new Int8Value(CompressionType.NONE.getCode()));
+ v.add(new Int32Value(0));
+ v.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setString(PROTOCOL_F, msg.getProtocol().toString());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMEREMAINING_F, msg.getTimeRemaining());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ root.setData(BLOB_F, payload);
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ v.add(new Int8Value(compressionResult.type().getCode()));
+ v.add(new Int32Value(compressionResult.uncompressedSize()));
+ v.add(new DataValue(compressionResult.data()));
+
+ return req;
+ }
+
+ @Override
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ CompressionType compression = CompressionType.valueOf(ret.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(ret.get(5).asData(), compression, ret.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+
+ Version version = new Version(root.field(VERSION_F).asString());
+ byte[] payload = root.field(BLOB_F).asData();
+
+ // Make sure that the owner understands the protocol.
+ Reply reply = null;
+ Error error = null;
+ if (payload.length > 0) {
+ Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
+ }
+ }
+ if (reply == null) {
+ reply = new EmptyReply();
+ }
+ if (error != null) {
+ reply.addError(error);
+ }
+ reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble());
+
+ Inspector errors = root.field(ERRORS_F);
+ for (int i = 0; i < errors.entries(); i++) {
+ Inspector e = errors.entry(i);
+ String service = e.field(SERVICE_F).asString();
+ reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(),
+ (service != null && service.length() > 0) ? service : serviceName));
+ }
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString()));
+ }
+ return reply;
+ }
+
+ protected Params toParams(Values args) {
+ CompressionType compression = CompressionType.valueOf(args.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(args.get(5).asData(), compression, args.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+ Params p = new Params();
+ p.version = new Version(root.field(VERSION_F).asString());
+ p.route = root.field(ROUTE_F).asString();
+ p.session = root.field(SESSION_F).asString();
+ p.retryEnabled = root.field(USERETRY_F).asBool();
+ p.retry = (int)root.field(RETRY_F).asLong();
+ p.timeRemaining = root.field(TIMEREMAINING_F).asLong();
+ p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString()));
+ p.payload = root.field(BLOB_F).asData();
+ p.traceLevel = (int)root.field(TRACELEVEL_F).asLong();
+ return p;
+ }
+
+ @Override
+ protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
+ ret.add(new Int8Value(CompressionType.NONE.getCode()));
+ ret.add(new Int32Value(0));
+ ret.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol().toString());
+ root.setData(BLOB_F, payload);
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor array = root.setArray(ERRORS_F);
+ for (int i = 0; i < reply.getNumErrors(); i++) {
+ Cursor e = array.addObject();
+ Error mbusE = reply.getError(i);
+ e.setLong(CODE_F, mbusE.getCode());
+ e.setString(MSG_F, mbusE.getMessage());
+ if (mbusE.getService() != null) {
+ e.setString(SERVICE_F, mbusE.getService());
+ }
+ }
+ }
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ ret.add(new Int8Value(compressionResult.type().getCode()));
+ ret.add(new Int32Value(compressionResult.uncompressedSize()));
+ ret.add(new DataValue(compressionResult.data()));
+ }
+
+}