From 77e08073393ee63d19d1866a9acd54a8b9b2a9ec Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 15 Sep 2017 15:48:48 +0200 Subject: Add support for headers. --- .../yahoo/messagebus/network/rpc/RPCSendV2.java | 39 ++++++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) (limited to 'messagebus') diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java index bcfa747c9f8..409c7beee9b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java @@ -32,8 +32,8 @@ import com.yahoo.text.Utf8Array; public class RPCSendV2 extends RPCSend { private final static String METHOD_NAME = "mbus.slime"; - private final static String METHOD_PARAMS = "bix"; - private final static String METHOD_RETURN = "bix"; + private final static String METHOD_PARAMS = "bixbix"; + private final static String METHOD_RETURN = "bixbix"; private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024); @Override @@ -43,12 +43,18 @@ public class RPCSendV2 extends RPCSend { Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this); method.methodDesc("Send a message bus request and get a reply back."); - method.paramDesc(0, "encoding", "Encoding type.") - .paramDesc(1, "decodedSize", "Number of bytes after decoding.") - .paramDesc(2, "payload", "Slime encoded payload."); - method.returnDesc(0, "encoding", "Encoding type.") - .returnDesc(1, "decodedSize", "Number of bytes after decoding.") - .returnDesc(2, "payload", "Slime encoded payload."); + method.paramDesc(0, "header_encoding", "Encoding type of header.") + .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.") + .paramDesc(2, "header_payload", "Slime encoded header payload.") + .paramDesc(3, "body_encoding", "Encoding type of body.") + .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.") + .paramDesc(5, "body_payload", "Slime encoded body payload."); + method.returnDesc(0, "header_encoding", "Encoding type of header.") + .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.") + .returnDesc(2, "header_payload", "Slime encoded header payload.") + .returnDesc(3, "body_encoding", "Encoding type of body.") + .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.") + .returnDesc(5, "body_payload", "Slime encoded body payload."); return method; } private static final String VERSION_F = new String("version"); @@ -69,7 +75,16 @@ public class RPCSendV2 extends RPCSend { @Override protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg, - long timeRemaining, byte[] payload, int traceLevel) { + long timeRemaining, byte[] payload, int traceLevel) + { + + Request req = new Request(METHOD_NAME); + Values v = req.parameters(); + + v.add(new Int8Value(CompressionType.NONE.getCode())); + v.add(new Int32Value(0)); + v.add(new DataValue(new byte[0])); + Slime slime = new Slime(); Cursor root = slime.setObject(); @@ -85,8 +100,6 @@ public class RPCSendV2 extends RPCSend { byte[] serializedSlime = BinaryFormat.encode(slime); Compressor.Compression compressionResult = compressor.compress(serializedSlime); - Request req = new Request(METHOD_NAME); - Values v = req.parameters(); v.add(new Int8Value(compressionResult.type().getCode())); v.add(new Int32Value(compressionResult.uncompressedSize())); @@ -157,6 +170,10 @@ public class RPCSendV2 extends RPCSend { @Override protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) { + ret.add(new Int8Value(CompressionType.NONE.getCode())); + ret.add(new Int32Value(0)); + ret.add(new DataValue(new byte[0])); + Slime slime = new Slime(); Cursor root = slime.setObject(); -- cgit v1.2.3