summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java54
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java269
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java332
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java192
-rw-r--r--messagebus/src/tests/advancedrouting/advancedrouting.cpp12
-rw-r--r--messagebus/src/tests/bucketsequence/bucketsequence.cpp1
-rw-r--r--messagebus/src/tests/choke/choke.cpp1
-rw-r--r--messagebus/src/tests/error/error.cpp4
-rw-r--r--messagebus/src/tests/routing/routing.cpp1
-rw-r--r--messagebus/src/tests/sendadapter/sendadapter.cpp1
-rw-r--r--messagebus/src/tests/serviceaddress/serviceaddress.cpp13
-rw-r--r--messagebus/src/tests/servicepool/servicepool.cpp2
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp3
-rw-r--r--messagebus/src/tests/slobrok/slobrok.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/blob.h5
-rw-r--r--messagebus/src/vespa/messagebus/network/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.h13
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp145
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h59
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h21
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp281
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h95
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h52
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.cpp406
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.h79
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp247
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h24
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h6
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.h3
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.h4
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.h3
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.h12
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.h3
-rw-r--r--messagebus/src/vespa/messagebus/tracenode.h2
-rw-r--r--messagebus_test/src/tests/error/cpp-client.cpp4
-rw-r--r--messagebus_test/src/tests/error/cpp-server.cpp1
-rw-r--r--messagebus_test/src/tests/speed/cpp-client.cpp1
-rw-r--r--messagebus_test/src/tests/speed/cpp-server.cpp1
-rw-r--r--messagebus_test/src/tests/trace/cpp-server.cpp1
-rw-r--r--messagebus_test/src/tests/trace/trace.cpp6
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp5
-rw-r--r--vespaclient/src/vespa/vespaclient/vesparoute/application.cpp9
47 files changed, 1571 insertions, 829 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 096c0c0b485..d42e396452a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,17 +2,28 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.VersionSpecification;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Task;
+import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
@@ -22,8 +33,16 @@ import com.yahoo.messagebus.routing.RoutingNode;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -44,7 +63,7 @@ public class RPCNetwork implements Network, MethodHandler {
private final Acceptor listener;
private final Mirror mirror;
private final Register register;
- private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>();
+ private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>();
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
@@ -162,9 +181,10 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter = new RPCSendV1();
- addSendAdapter(new VersionSpecification(5), adapter);
- addSendAdapter(new VersionSpecification(6), adapter);
+ RPCSendAdapter adapter1 = new RPCSendV1();
+ RPCSendAdapter adapter2 = new RPCSendV2();
+ addSendAdapter(new Version(5), adapter1);
+ addSendAdapter(new Version(6,142), adapter2);
}
@Override
@@ -234,11 +254,9 @@ public class RPCNetwork implements Network, MethodHandler {
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
- replyError(ctx, ErrorCode.NETWORK_SHUTDOWN,
- "Network layer has performed shutdown.");
+ replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -315,7 +333,7 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to register an adapter.
* @param adapter The adapter to register.
*/
- private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) {
+ private void addSendAdapter(Version version, RPCSendAdapter adapter) {
adapter.attach(this);
sendAdapters.put(version, adapter);
}
@@ -328,12 +346,8 @@ public class RPCNetwork implements Network, MethodHandler {
* @return The compatible adapter.
*/
private RPCSendAdapter getSendAdapter(Version version) {
- for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) {
- if (entry.getKey().matches(version)) {
- return entry.getValue();
- }
- }
- return null;
+ Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
+ return (lower != null) ? lower.getValue() : null;
}
/**
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
new file mode 100644
index 00000000000..5fa996b9288
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -0,0 +1,269 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceLevel;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.send1/mbus.slime".
+ *
+ * @author baldersheim
+ */
+public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+
+ private RPCNetwork net = null;
+ private String clientIdent = "client";
+ private String serverIdent = "server";
+
+ protected abstract Method buildMethod();
+ protected abstract String getReturnSpec();
+ protected abstract Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel);
+ protected abstract Reply createReply(Values ret, String serviceName, Trace trace);
+ protected abstract Params toParams(Values req);
+ protected abstract void createReponse(Values ret, Reply reply, Version version, byte [] payload);
+ @Override
+ public final void attach(RPCNetwork net) {
+ this.net = net;
+ String prefix = net.getIdentity().getServicePrefix();
+ if (prefix != null && prefix.length() > 0) {
+ clientIdent = "'" + prefix + "'";
+ serverIdent = clientIdent;
+ }
+ net.getSupervisor().addMethod(buildMethod());
+ }
+
+ @Override
+ public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
+ SendContext ctx = new SendContext(recipient, timeRemaining);
+ RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
+ Message msg = recipient.getMessage();
+ Route route = new Route(recipient.getRoute());
+ Hop hop = route.removeHop(0);
+
+ Request req = encodeRequest(version, route, address,msg, timeRemaining, payload, ctx.trace.getLevel());
+
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Sending message (version " + version + ") from " + clientIdent + " to '" +
+ address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getJRTTarget().invokeVoid(req);
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Not waiting for a reply from '" + address.getServiceName() + "'.");
+ }
+ Reply reply = new EmptyReply();
+ reply.getTrace().swap(ctx.trace);
+ net.getOwner().deliverReply(reply, recipient);
+ } else {
+ req.setContext(ctx);
+ address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
+ }
+ req.discardParameters(); // allow garbage collection of request parameters
+ }
+
+ protected final Object decode(Utf8Array protocolName, Version version, byte [] payload) {
+ Protocol protocol = net.getOwner().getProtocol(protocolName);
+ if (protocol != null) {
+ Routable routable = protocol.decode(version, payload);
+ if (routable != null) {
+ if (routable instanceof Reply) {
+ return routable;
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message.");
+ }
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable.");
+ }
+ } else {
+ return new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
+ }
+ }
+
+ @Override
+ public final void handleRequestDone(Request req) {
+ SendContext ctx = (SendContext)req.getContext();
+ String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ Reply reply = null;
+ Error error = null;
+ if (!req.checkReturnTypes(getReturnSpec())) {
+ // Map all known JRT errors to the appropriate message bus error.
+ reply = new EmptyReply();
+ switch (req.errorCode()) {
+ case com.yahoo.jrt.ErrorCode.TIMEOUT:
+ error = new Error(ErrorCode.TIMEOUT,
+ "A timeout occured while waiting for '" + serviceName + "' (" +
+ ctx.timeout + " seconds expired); " + req.errorMessage());
+ break;
+ case com.yahoo.jrt.ErrorCode.CONNECTION:
+ error = new Error(ErrorCode.CONNECTION_ERROR,
+ "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
+ break;
+ default:
+ error = new Error(ErrorCode.NETWORK_ERROR,
+ "A network error occured for '" + serviceName + "'; " + req.errorMessage());
+ }
+ } else {
+ reply = createReply(req.returnValues(), serviceName, ctx.trace);
+ }
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ }
+ reply.getTrace().swap(ctx.trace);
+ if (error != null) {
+ reply.addError(error);
+ }
+ net.getOwner().deliverReply(reply, ctx.recipient);
+ }
+
+ protected final class Params {
+ Version version;
+ String route;
+ String session;
+ boolean retryEnabled;
+ int retry;
+ long timeRemaining;
+ Utf8Array protocolName;
+ byte [] payload;
+ int traceLevel;
+ }
+
+ @Override
+ public final void invoke(Request request) {
+ request.detach();
+ Params p = toParams(request.parameters());
+
+ request.discardParameters(); // allow garbage collection of request parameters
+
+ // Make sure that the owner understands the protocol.
+ Protocol protocol = net.getOwner().getProtocol(p.protocolName);
+ if (protocol == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
+ return;
+ }
+ Routable routable = protocol.decode(p.version, p.payload);
+ if (routable == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable."));
+ return;
+ }
+ if (routable instanceof Reply) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message."));
+ return;
+ }
+ Message msg = (Message)routable;
+ if (p.route != null && p.route.length() > 0) {
+ msg.setRoute(net.getRoute(p.route));
+ }
+ msg.setContext(new ReplyContext(request, p.version));
+ msg.pushHandler(this);
+ msg.setRetryEnabled(p.retryEnabled);
+ msg.setRetry(p.retry);
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(p.timeRemaining);
+ msg.getTrace().setLevel(p.traceLevel);
+ if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + p.session + "'.");
+ }
+ net.getOwner().deliverMessage(msg, p.session);
+ }
+
+ @Override
+ public final void handleReply(Reply reply) {
+ ReplyContext ctx = (ReplyContext)reply.getContext();
+ reply.setContext(null);
+
+ // Add trace information.
+ if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
+ }
+
+ // Encode and return the reply through the RPC request.
+ byte[] payload = new byte[0];
+ if (reply.getType() != 0) {
+ Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
+ if (protocol != null) {
+ payload = protocol.encode(ctx.version, reply);
+ }
+ if (payload == null || payload.length == 0) {
+ reply.addError(new Error(ErrorCode.ENCODE_ERROR,
+ "An error occured while encoding the reply."));
+ }
+ }
+ createReponse(ctx.request.returnValues(), reply, ctx.version, payload);
+ ctx.request.returnRequest();
+ }
+
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The JRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ private void replyError(Request request, Version version, int traceLevel, Error err) {
+ Reply reply = new EmptyReply();
+ reply.setContext(new ReplyContext(request, version));
+ reply.getTrace().setLevel(traceLevel);
+ reply.addError(err);
+ handleReply(reply);
+ }
+
+ private static class SendContext {
+
+ final RoutingNode recipient;
+ final Trace trace;
+ final double timeout;
+
+ SendContext(RoutingNode recipient, long timeRemaining) {
+ this.recipient = recipient;
+ trace = new Trace(recipient.getTrace().getLevel());
+ timeout = timeRemaining * 0.001;
+ }
+ }
+
+ private static class ReplyContext {
+
+ final Request request;
+ final Version version;
+
+ public ReplyContext(Request request, Version version) {
+ this.request = request;
+ this.version = version;
+ }
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index 40cb7fb9ee9..0a4a4a54a68 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -2,40 +2,41 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.DoubleValue;
+import com.yahoo.jrt.Int32Array;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int64Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
-import com.yahoo.messagebus.*;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
-import com.yahoo.text.Utf8String;
/**
* Implements the request adapter for method "mbus.send1".
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+public class RPCSendV1 extends RPCSend {
private final String METHOD_NAME = "mbus.send1";
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
@Override
- public void attach(RPCNetwork net) {
- this.net = net;
- String prefix = net.getIdentity().getServicePrefix();
- if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
- }
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
method.methodDesc("Send a message bus request and get a reply back.");
@@ -56,206 +57,80 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
.returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.")
.returnDesc(6, "payload", "The protocol specific reply payload.")
.returnDesc(7, "trace", "A string representation of the trace.");
- net.getSupervisor().addMethod(method);
+ return method;
}
-
@Override
- public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
- RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
- Message msg = recipient.getMessage();
- Route route = new Route(recipient.getRoute());
- Hop hop = route.removeHop(0);
-
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel) {
Request req = new Request(METHOD_NAME);
- req.parameters().add(new StringValue(version.toString()));
- req.parameters().add(new StringValue(route.toString()));
- req.parameters().add(new StringValue(address.getSessionName()));
- req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
- req.parameters().add(new Int32Value(msg.getRetry()));
- req.parameters().add(new Int64Value(timeRemaining));
- req.parameters().add(new StringValue(msg.getProtocol()));
- req.parameters().add(new DataValue(payload));
- req.parameters().add(new Int32Value(ctx.trace.getLevel()));
-
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Sending message (version " + version + ") from " + clientIdent + " to '" +
- address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getJRTTarget().invokeVoid(req);
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Not waiting for a reply from '" + address.getServiceName() + "'.");
- }
- Reply reply = new EmptyReply();
- reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
- } else {
- req.setContext(ctx);
- address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
- }
- req.discardParameters(); // allow garbage collection of request parameters
+ Values v = req.parameters();
+ v.add(new StringValue(version.toString()));
+ v.add(new StringValue(route.toString()));
+ v.add(new StringValue(address.getSessionName()));
+ v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
+ v.add(new Int32Value(msg.getRetry()));
+ v.add(new Int64Value(timeRemaining));
+ v.add(new StringValue(msg.getProtocol()));
+ v.add(new DataValue(payload));
+ v.add(new Int32Value(traceLevel));
+ return req;
}
@Override
- public void handleRequestDone(Request req) {
- SendContext ctx = (SendContext)req.getContext();
- String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ Version version = new Version(ret.get(0).asUtf8Array());
+ double retryDelay = ret.get(1).asDouble();
+ int[] errorCodes = ret.get(2).asInt32Array();
+ String[] errorMessages = ret.get(3).asStringArray();
+ String[] errorServices = ret.get(4).asStringArray();
+ Utf8Array protocolName = ret.get(5).asUtf8Array();
+ byte[] payload = ret.get(6).asData();
+ String replyTrace = ret.get(7).asString();
+
+ // Make sure that the owner understands the protocol.
Reply reply = null;
Error error = null;
- if (!req.checkReturnTypes(METHOD_RETURN)) {
- // Map all known JRT errors to the appropriate message bus error.
- reply = new EmptyReply();
- switch (req.errorCode()) {
- case com.yahoo.jrt.ErrorCode.TIMEOUT:
- error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
- ctx.timeout + " seconds expired); " + req.errorMessage());
- break;
- case com.yahoo.jrt.ErrorCode.CONNECTION:
- error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
- break;
- default:
- error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
- }
- } else {
- // Retrieve all reply components from JRT request object.
- Version version = new Version(req.returnValues().get(0).asUtf8Array());
- double retryDelay = req.returnValues().get(1).asDouble();
- int[] errorCodes = req.returnValues().get(2).asInt32Array();
- String[] errorMessages = req.returnValues().get(3).asStringArray();
- String[] errorServices = req.returnValues().get(4).asStringArray();
- Utf8Array protocolName = req.returnValues().get(5).asUtf8Array();
- byte[] payload = req.returnValues().get(6).asData();
- String replyTrace = req.returnValues().get(7).asString();
-
- // Make sure that the owner understands the protocol.
- if (payload.length > 0) {
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol != null) {
- Routable routable = protocol.decode(version, payload);
- if (routable != null) {
- if (routable instanceof Reply) {
- reply = (Reply)routable;
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
- }
- }
- if (reply == null) {
- reply = new EmptyReply();
- }
- reply.setRetryDelay(retryDelay);
- for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
- reply.addError(new Error(errorCodes[i],
- errorMessages[i],
- errorServices[i].length() > 0 ? errorServices[i] : serviceName));
- }
- if (ctx.trace.getLevel() > 0) {
- ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace));
+ if (payload.length > 0) {
+ Object retval = decode(protocolName, version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
}
}
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ if (reply == null) {
+ reply = new EmptyReply();
}
- reply.getTrace().swap(ctx.trace);
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
- }
-
- @Override
- public void invoke(Request request) {
- request.detach();
- Version version = new Version(request.parameters().get(0).asUtf8Array());
- String route = request.parameters().get(1).asString();
- String session = request.parameters().get(2).asString();
- boolean retryEnabled = (request.parameters().get(3).asInt8() != 0);
- int retry = request.parameters().get(4).asInt32();
- long timeRemaining = request.parameters().get(5).asInt64();
- Utf8Array protocolName = request.parameters().get(6).asUtf8Array();
- byte[] payload = request.parameters().get(7).asData();
- int traceLevel = request.parameters().get(8).asInt32();
-
- request.discardParameters(); // allow garbage collection of request parameters
-
- // Make sure that the owner understands the protocol.
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol == null) {
- replyError(request, version, traceLevel,
- new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + "."));
- return;
- }
- Routable routable = protocol.decode(version, payload);
- if (routable == null) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable."));
- return;
- }
- if (routable instanceof Reply) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message."));
- return;
+ reply.setRetryDelay(retryDelay);
+ for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
+ reply.addError(new Error(errorCodes[i], errorMessages[i],
+ errorServices[i].length() > 0 ? errorServices[i] : serviceName));
}
- Message msg = (Message)routable;
- if (route != null && route.length() > 0) {
- msg.setRoute(net.getRoute(route));
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(replyTrace));
}
- msg.setContext(new ReplyContext(request, version));
- msg.pushHandler(this);
- msg.setRetryEnabled(retryEnabled);
- msg.setRetry(retry);
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(timeRemaining);
- msg.getTrace().setLevel(traceLevel);
- if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'.");
- }
- net.getOwner().deliverMessage(msg, session);
+ return reply;
}
- @Override
- public void handleReply(Reply reply) {
- ReplyContext ctx = (ReplyContext)reply.getContext();
- reply.setContext(null);
-
- // Add trace information.
- if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
- }
+ protected Params toParams(Values args) {
+ Params p = new Params();
+ p.version = new Version(args.get(0).asUtf8Array());
+ p.route = args.get(1).asString();
+ p.session = args.get(2).asString();
+ p.retryEnabled = (args.get(3).asInt8() != 0);
+ p.retry = args.get(4).asInt32();
+ p.timeRemaining = args.get(5).asInt64();
+ p.protocolName = args.get(6).asUtf8Array();
+ p.payload = args.get(7).asData();
+ p.traceLevel = args.get(8).asInt32();
+ return p;
+ }
- // Encode and return the reply through the RPC request.
- byte[] payload = new byte[0];
- if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
- }
- if (payload == null || payload.length == 0) {
- reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
- }
- }
+ @Override
+ protected void createReponse(Values ret, Reply reply, Version version, byte [] payload) {
int[] eCodes = new int[reply.getNumErrors()];
String[] eMessages = new String[reply.getNumErrors()];
String[] eServices = new String[reply.getNumErrors()];
@@ -265,57 +140,14 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
eMessages[i] = error.getMessage();
eServices[i] = error.getService() != null ? error.getService() : "";
}
- ctx.request.returnValues().add(new StringValue(ctx.version.toString()));
- ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay()));
- ctx.request.returnValues().add(new Int32Array(eCodes));
- ctx.request.returnValues().add(new StringArray(eMessages));
- ctx.request.returnValues().add(new StringArray(eServices));
- ctx.request.returnValues().add(new StringValue(reply.getProtocol()));
- ctx.request.returnValues().add(new DataValue(payload));
- ctx.request.returnValues().add(new StringValue(
- reply.getTrace().getRoot() != null ?
- reply.getTrace().getRoot().encode() :
- ""));
- ctx.request.returnRequest();
+ ret.add(new StringValue(version.toString()));
+ ret.add(new DoubleValue(reply.getRetryDelay()));
+ ret.add(new Int32Array(eCodes));
+ ret.add(new StringArray(eMessages));
+ ret.add(new StringArray(eServices));
+ ret.add(new StringValue(reply.getProtocol()));
+ ret.add(new DataValue(payload));
+ ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : ""));
}
- /**
- * Send an error reply for a given request.
- *
- * @param request The JRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- private void replyError(Request request, Version version, int traceLevel, Error err) {
- Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
- reply.getTrace().setLevel(traceLevel);
- reply.addError(err);
- handleReply(reply);
- }
-
- private static class SendContext {
-
- final RoutingNode recipient;
- final Trace trace;
- final double timeout;
-
- SendContext(RoutingNode recipient, long timeRemaining) {
- this.recipient = recipient;
- trace = new Trace(recipient.getTrace().getLevel());
- timeout = timeRemaining * 0.001;
- }
- }
-
- private static class ReplyContext {
-
- final Request request;
- final Version version;
-
- public ReplyContext(Request request, Version version) {
- this.request = request;
- this.version = version;
- }
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
new file mode 100644
index 00000000000..239d82947e5
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
@@ -0,0 +1,192 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.sendslime".
+ *
+ * @author baldersheim
+ */
+public class RPCSendV2 extends RPCSend {
+
+ private final static String METHOD_NAME = "mbus.slime";
+ private final static String METHOD_PARAMS = "bix";
+ private final static String METHOD_RETURN = "bix";
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024);
+
+ @Override
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
+
+ Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
+ method.methodDesc("Send a message bus request and get a reply back.");
+ method.paramDesc(0, "encoding", "Encoding type.")
+ .paramDesc(1, "decodedSize", "Number of bytes after decoding.")
+ .paramDesc(2, "payload", "Slime encoded payload.");
+ method.returnDesc(0, "encoding", "Encoding type.")
+ .returnDesc(1, "decodedSize", "Number of bytes after decoding.")
+ .returnDesc(2, "payload", "Slime encoded payload.");
+ return method;
+ }
+ private static final String VERSION_F = new String("version");
+ private static final String ROUTE_F = new String("route");
+ private static final String SESSION_F = new String("session");
+ private static final String PROTOCOL_F = new String("prot");
+ private static final String TRACELEVEL_F = new String("tracelevel");
+ private static final String TRACE_F = new String("trace");
+ private static final String USERETRY_F = new String("useretry");
+ private static final String RETRY_F = new String("retry");
+ private static final String RETRYDELAY_F = new String("retrydelay");
+ private static final String TIMEREMAINING_F = new String("timeleft");
+ private static final String ERRORS_F = new String("errors");
+ private static final String SERVICE_F = new String("service");
+ private static final String CODE_F = new String("code");
+ private static final String BLOB_F = new String("msg");
+ private static final String MSG_F = new String("msg");
+
+ @Override
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setString(PROTOCOL_F, msg.getProtocol().toString());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMEREMAINING_F, msg.getTimeRemaining());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ root.setData(BLOB_F, payload);
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+ Request req = new Request(METHOD_NAME);
+ Values v = req.parameters();
+
+ v.add(new Int8Value(compressionResult.type().getCode()));
+ v.add(new Int32Value(compressionResult.uncompressedSize()));
+ v.add(new DataValue(compressionResult.data()));
+
+ return req;
+ }
+
+ @Override
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ CompressionType compression = CompressionType.valueOf(ret.get(0).asInt8());
+ byte[] slimeBytes = compressor.decompress(ret.get(2).asData(), compression, ret.get(1).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+
+ Version version = new Version(root.field(VERSION_F).asString());
+ byte[] payload = root.field(BLOB_F).asData();
+
+ // Make sure that the owner understands the protocol.
+ Reply reply = null;
+ Error error = null;
+ if (payload.length > 0) {
+ Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
+ }
+ }
+ if (reply == null) {
+ reply = new EmptyReply();
+ }
+ if (error != null) {
+ reply.addError(error);
+ }
+ reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble());
+
+ Inspector errors = root.field(ERRORS_F);
+ for (int i = 0; i < errors.entries(); i++) {
+ Inspector e = errors.entry(i);
+ String service = e.field(SERVICE_F).asString();
+ reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(),
+ (service != null && service.length() > 0) ? service : serviceName));
+ }
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString()));
+ }
+ return reply;
+ }
+
+ protected Params toParams(Values args) {
+ CompressionType compression = CompressionType.valueOf(args.get(0).asInt8());
+ byte[] slimeBytes = compressor.decompress(args.get(2).asData(), compression, args.get(1).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+ Params p = new Params();
+ p.version = new Version(root.field(VERSION_F).asString());
+ p.route = root.field(ROUTE_F).asString();
+ p.session = root.field(SESSION_F).asString();
+ p.retryEnabled = root.field(USERETRY_F).asBool();
+ p.retry = (int)root.field(RETRY_F).asLong();
+ p.timeRemaining = root.field(TIMEREMAINING_F).asLong();
+ p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString()));
+ p.payload = root.field(BLOB_F).asData();
+ p.traceLevel = (int)root.field(TRACELEVEL_F).asLong();
+ return p;
+ }
+
+ @Override
+ protected void createReponse(Values ret, Reply reply, Version version, byte [] payload) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol().toString());
+ root.setData(BLOB_F, payload);
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor array = root.setArray(ERRORS_F);
+ for (int i = 0; i < reply.getNumErrors(); i++) {
+ Cursor e = array.addObject();
+ Error mbusE = reply.getError(i);
+ e.setLong(CODE_F, mbusE.getCode());
+ e.setString(MSG_F, mbusE.getMessage());
+ if (mbusE.getService() != null) {
+ e.setString(SERVICE_F, mbusE.getService());
+ }
+ }
+ }
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ ret.add(new Int8Value(compressionResult.type().getCode()));
+ ret.add(new Int32Value(compressionResult.uncompressedSize()));
+ ret.add(new DataValue(compressionResult.data()));
+ }
+
+}
diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
index e847fdf3222..8d6b8cd3875 100644
--- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp
+++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
@@ -1,17 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routing/errordirective.h>
-#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
+
#include <vespa/messagebus/testlib/custompolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
using namespace mbus;
@@ -117,7 +115,7 @@ Test::Main()
void
Test::testAdvanced(TestData &data)
{
- const double TIMEOUT=60;
+ const double TIMEOUT = 60;
IProtocol::SP protocol(new SimpleProtocol());
SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol);
simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE)));
diff --git a/messagebus/src/tests/bucketsequence/bucketsequence.cpp b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
index 4073469c253..d98acc4f191 100644
--- a/messagebus/src/tests/bucketsequence/bucketsequence.cpp
+++ b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
@@ -5,7 +5,6 @@
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp
index b8e20eb9074..91f91c84f45 100644
--- a/messagebus/src/tests/choke/choke.cpp
+++ b/messagebus/src/tests/choke/choke.cpp
@@ -2,7 +2,6 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/reply.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp
index dbaa869507b..1d8a489a5ed 100644
--- a/messagebus/src/tests/error/error.cpp
+++ b/messagebus/src/tests/error/error.cpp
@@ -65,14 +65,14 @@ Test::Main()
reply = pxy.getReply();
ASSERT_TRUE(reply.get() != 0);
- EXPECT_EQUAL(reply->getNumErrors(), 1u);
+ ASSERT_EQUAL(reply->getNumErrors(), 1u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality"));
is->forward(std::move(reply));
reply = src.getReply();
ASSERT_TRUE(reply.get() != 0);
- EXPECT_EQUAL(reply->getNumErrors(), 2u);
+ ASSERT_EQUAL(reply->getNumErrors(), 2u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session");
}
diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp
index e907e0d1163..515cbd99fde 100644
--- a/messagebus/src/tests/routing/routing.cpp
+++ b/messagebus/src/tests/routing/routing.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/routing/routingcontext.h>
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp
index 8ae1d0993cb..e3fa3278300 100644
--- a/messagebus/src/tests/sendadapter/sendadapter.cpp
+++ b/messagebus/src/tests/sendadapter/sendadapter.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
index 15a2e9fdac8..89edb981716 100644
--- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp
+++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
@@ -3,20 +3,7 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/sourcesession.h>
-#include <vespa/messagebus/intermediatesession.h>
-#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/error.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/network/rpcservice.h>
-#include <vespa/messagebus/sourcesessionparams.h>
using namespace mbus;
diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp
index c98b342b2f3..86a800b600a 100644
--- a/messagebus/src/tests/servicepool/servicepool.cpp
+++ b/messagebus/src/tests/servicepool/servicepool.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/messagebus/network/rpcservicepool.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index b5e6cac970e..070b51bbbc2 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -1,9 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
diff --git a/messagebus/src/tests/slobrok/slobrok.cpp b/messagebus/src/tests/slobrok/slobrok.cpp
index 360705e7eae..6c389c25e70 100644
--- a/messagebus/src/tests/slobrok/slobrok.cpp
+++ b/messagebus/src/tests/slobrok/slobrok.cpp
@@ -2,10 +2,10 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
-#include <string>
-#include <sstream>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+
#include <vespa/vespalib/util/host_name.h>
using slobrok::api::IMirrorAPI;
diff --git a/messagebus/src/vespa/messagebus/blob.h b/messagebus/src/vespa/messagebus/blob.h
index 1e84222064a..0509d71d77c 100644
--- a/messagebus/src/vespa/messagebus/blob.h
+++ b/messagebus/src/vespa/messagebus/blob.h
@@ -26,13 +26,13 @@ public:
_payload(Alloc::alloc(s)),
_sz(s)
{ }
- Blob(Blob && rhs) :
+ Blob(Blob && rhs) noexcept :
_payload(std::move(rhs._payload)),
_sz(rhs._sz)
{
rhs._sz = 0;
}
- Blob & operator = (Blob && rhs) {
+ Blob & operator = (Blob && rhs) noexcept {
swap(rhs);
return *this;
}
@@ -65,4 +65,3 @@ private:
};
} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
index e8992034622..750ff20240f 100644
--- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
@@ -6,7 +6,9 @@ vespa_add_library(messagebus_network OBJECT
oosmanager.cpp
rpcnetwork.cpp
rpcnetworkparams.cpp
+ rpcsend.cpp
rpcsendv1.cpp
+ rpcsendv2.cpp
rpcservice.cpp
rpcserviceaddress.cpp
rpcservicepool.cpp
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h
index b521520d2b3..eac00b93896 100644
--- a/messagebus/src/vespa/messagebus/network/oosmanager.h
+++ b/messagebus/src/vespa/messagebus/network/oosmanager.h
@@ -20,14 +20,11 @@ class RPCNetwork;
*/
class OOSManager : public FNET_Task {
public:
- /**
- * Convenience typedefs.
- */
- typedef slobrok::api::IMirrorAPI IMirrorAPI;
- typedef IMirrorAPI::SpecList SpecList;
- typedef std::vector<OOSClient::SP> ClientList;
- typedef std::set<string> StringSet;
- typedef std::shared_ptr<StringSet> OOSSet;
+ using IMirrorAPI = slobrok::api::IMirrorAPI;
+ using SpecList = IMirrorAPI::SpecList;
+ using ClientList = std::vector<OOSClient::SP>;
+ using StringSet = std::set<string>;
+ using OOSSet = std::shared_ptr<StringSet>;
private:
FRT_Supervisor &_orb;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 75c5fc3f6c5..b7f1d6de60d 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -1,6 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "inetworkowner.h"
#include "rpcnetwork.h"
+#include "rpcservicepool.h"
+#include "oosmanager.h"
+#include "rpcsendv1.h"
+#include "rpcsendv2.h"
+#include "rpctargetpool.h"
+#include "rpcnetworkparams.h"
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/iprotocol.h>
#include <vespa/messagebus/tracelevel.h>
@@ -11,6 +16,8 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/scheduler.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/frt/supervisor.h>
#include <vespa/log/log.h>
LOG_SETUP(".rpcnetwork");
@@ -52,17 +59,15 @@ public:
namespace mbus {
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
- const std::vector<RoutingNode*> &recipients) :
- _net(net),
- _msg(msg),
- _traceLevel(msg.getTrace().getLevel()),
- _recipients(recipients),
- _hasError(false),
- _pending(_recipients.size()),
- _version(_net.getVersion())
-{
- // empty
-}
+ const std::vector<RoutingNode*> &recipients)
+ : _net(net),
+ _msg(msg),
+ _traceLevel(msg.getTrace().getLevel()),
+ _recipients(recipients),
+ _hasError(false),
+ _pending(_recipients.size()),
+ _version(_net.getVersion())
+{ }
void
RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
@@ -85,11 +90,9 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
-RPCNetwork::TargetPoolTask::TargetPoolTask(
- FNET_Scheduler &scheduler,
- RPCTargetPool &pool) :
- FNET_Task(&scheduler),
- _pool(pool)
+RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
+ : FNET_Task(&scheduler),
+ _pool(pool)
{
ScheduleNow();
}
@@ -104,24 +107,26 @@ RPCNetwork::TargetPoolTask::PerformTask()
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(0),
_ident(params.getIdentity()),
- _threadPool(128000, 0),
- _transport(),
- _orb(&_transport, NULL),
- _scheduler(*_transport.GetScheduler()),
- _targetPool(params.getConnectionExpireSecs()),
- _targetPoolTask(_scheduler, _targetPool),
- _servicePool(*this, 4096),
- _slobrokCfgFactory(params.getSlobrokConfig()),
- _mirror(std::make_unique<slobrok::api::MirrorAPI>(_orb, _slobrokCfgFactory)),
- _regAPI(std::make_unique<slobrok::api::RegisterAPI>(_orb, _slobrokCfgFactory)),
- _oosManager(_orb, *_mirror, params.getOOSServerPattern()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get(), _threadPool.get())),
+ _scheduler(*_transport->GetScheduler()),
+ _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
+ _targetPoolTask(_scheduler, *_targetPool),
+ _servicePool(std::make_unique<RPCServicePool>(*this, 4096)),
+ _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
+ _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
+ _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
+ _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())),
_requestedPort(params.getListenPort()),
- _sendV1(),
- _sendAdapters()
+ _sendV1(std::make_unique<RPCSendV1>()),
+ _sendV2(std::make_unique<RPCSendV2>()),
+ _sendAdapters(),
+ _compressionConfig(params.getCompressionConfig())
{
- _transport.SetDirectWrite(false);
- _transport.SetMaxInputBufferSize(params.getMaxInputBufferSize());
- _transport.SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
+ _transport->SetDirectWrite(false);
+ _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
+ _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
}
RPCNetwork::~RPCNetwork()
@@ -132,33 +137,33 @@ RPCNetwork::~RPCNetwork()
FRT_RPCRequest *
RPCNetwork::allocRequest()
{
- return _orb.AllocRPCRequest();
+ return _orb->AllocRPCRequest();
}
RPCTarget::SP
RPCNetwork::getTarget(const RPCServiceAddress &address)
{
- return _targetPool.getTarget(_orb, address);
+ return _targetPool->getTarget(*_orb, address);
}
void
-RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode,
- const string &errMsg)
+RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg)
{
- for (std::vector<RoutingNode*>::const_iterator it = ctx._recipients.begin();
- it != ctx._recipients.end(); ++it)
- {
+ for (RoutingNode * rnode : ctx._recipients) {
Reply::UP reply(new EmptyReply());
reply->setTrace(Trace(ctx._traceLevel));
reply->addError(Error(errCode, errMsg));
- _owner->deliverReply(std::move(reply), **it);
+ _owner->deliverReply(std::move(reply), *rnode);
}
}
+int RPCNetwork::getPort() const { return _orb->GetListenPort(); }
+
+
void
RPCNetwork::flushTargetPool()
{
- _targetPool.flushTargets(true);
+ _targetPool->flushTargets(true);
}
const vespalib::Version &
@@ -173,13 +178,13 @@ RPCNetwork::attach(INetworkOwner &owner)
LOG_ASSERT(_owner == 0);
_owner = &owner;
- _sendV1.attach(*this);
- _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(5), &_sendV1));
- _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(6), &_sendV1));
+ _sendV1->attach(*this);
+ _sendV2->attach(*this);
+ _sendAdapters[vespalib::Version(5)] = _sendV1.get();
+ _sendAdapters[vespalib::Version(6, 142)] = _sendV2.get();
- FRT_ReflectionBuilder builder(&_orb);
- builder.DefineMethod("mbus.getVersion", "", "s", true,
- FRT_METHOD(RPCNetwork::invoke), this);
+ FRT_ReflectionBuilder builder(_orb.get());
+ builder.DefineMethod("mbus.getVersion", "", "s", true, FRT_METHOD(RPCNetwork::invoke), this);
builder.MethodDesc("Retrieves the message bus version.");
builder.ReturnDesc("version", "The message bus version.");
}
@@ -193,29 +198,23 @@ RPCNetwork::invoke(FRT_RPCRequest *req)
const string
RPCNetwork::getConnectionSpec() const
{
- return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb.GetListenPort());
+ return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb->GetListenPort());
}
RPCSendAdapter *
RPCNetwork::getSendAdapter(const vespalib::Version &version)
{
- for (SendAdapterMap::iterator it = _sendAdapters.begin();
- it != _sendAdapters.end(); ++it)
- {
- if (it->first.matches(version)) {
- return it->second;
- }
- }
- return NULL;
+ auto lower = _sendAdapters.lower_bound(version);
+ return (lower != _sendAdapters.end()) ? lower->second : nullptr;
}
bool
RPCNetwork::start()
{
- if (!_orb.Listen(_requestedPort)) {
+ if (!_orb->Listen(_requestedPort)) {
return false;
}
- if (!_transport.Start(&_threadPool)) {
+ if (!_transport->Start(_threadPool.get())) {
return false;
}
return true;
@@ -227,13 +226,13 @@ bool
RPCNetwork::waitUntilReady(double seconds) const
{
slobrok::api::SlobrokList brokerList;
- slobrok::Configurator::UP configurator = _slobrokCfgFactory.create(brokerList);
+ slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList);
bool hasConfig = false;
for (uint32_t i = 0; i < seconds * 100; ++i) {
if (configurator->poll()) {
hasConfig = true;
}
- if (_mirror->ready() && _oosManager.isReady()) {
+ if (_mirror->ready() && _oosManager->isReady()) {
return true;
}
FastOS_Thread::Sleep(10);
@@ -244,7 +243,7 @@ RPCNetwork::waitUntilReady(double seconds) const
std::string brokers = brokerList.logString();
LOG(error, "mirror (of %s) failed to become ready in %d seconds",
brokers.c_str(), (int)seconds);
- } else if (! _oosManager.isReady()) {
+ } else if (! _oosManager->isReady()) {
LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds);
}
return false;
@@ -293,24 +292,23 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient)
Error
RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName)
{
- if (_oosManager.isOOS(serviceName)) {
+ if (_oosManager->isOOS(serviceName)) {
return Error(ErrorCode::SERVICE_OOS,
make_string("The service '%s' has been marked as out of service.",
serviceName.c_str()));
}
- RPCServiceAddress::UP ret = _servicePool.resolve(serviceName);
+ RPCServiceAddress::UP ret = _servicePool->resolve(serviceName);
if (ret.get() == NULL) {
return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE,
make_string("The address of service '%s' could not be resolved. It is not currently "
- "registered with the Vespa name server. "
- "The service must be having problems, or the routing configuration is wrong.",
- serviceName.c_str()));
+ "registered with the Vespa name server. "
+ "The service must be having problems, or the routing configuration is wrong.",
+ serviceName.c_str()));
}
- RPCTarget::SP target = _targetPool.getTarget(_orb, *ret);
+ RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret);
if (target.get() == NULL) {
return Error(ErrorCode::CONNECTION_ERROR,
- make_string("Failed to connect to service '%s'.",
- serviceName.c_str()));
+ make_string("Failed to connect to service '%s'.", serviceName.c_str()));
}
ret->setTarget(target); // free by freeServiceAddress()
recipient.setServiceAddress(IServiceAddress::UP(ret.release()));
@@ -343,8 +341,7 @@ void
RPCNetwork::send(RPCNetwork::SendContext &ctx)
{
if (ctx._hasError) {
- replyError(ctx, ErrorCode::HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
uint64_t timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
@@ -378,8 +375,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
- _transport.ShutDown(false);
- _threadPool.Close();
+ _transport->ShutDown(false);
+ _threadPool->Close();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 856f9d0ef64..3f51f443028 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -2,27 +2,33 @@
#pragma once
#include "inetwork.h"
-#include "oosmanager.h"
-#include "rpcnetworkparams.h"
-#include "rpcsendv1.h"
-#include "rpcservicepool.h"
-#include "rpctargetpool.h"
+#include "rpcsendadapter.h"
+#include "rpctarget.h"
+#include "identity.h"
#include <vespa/messagebus/blob.h>
#include <vespa/messagebus/blobref.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/reply.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/vespalib/component/versionspecification.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/compressionconfig.h>
+#include <vespa/fnet/frt/invokable.h>
+
+class FNET_Transport;
namespace slobrok {
- namespace api {
- class RegisterAPI;
- }
+ namespace api { class RegisterAPI; }
+ class ConfiguratorFactory;
}
namespace mbus {
+
+class OOSManager;
+class RPCServicePool;
+class RPCTargetPool;
+class RPCNetworkParams;
+class RPCServiceAddress;
+
/**
* Network implementation based on RPC. This class is responsible for
* keeping track of services and for sending messages to services.
@@ -30,6 +36,7 @@ namespace mbus {
class RPCNetwork : public INetwork,
public FRT_Invokable {
private:
+ using CompressionConfig = vespalib::compression::CompressionConfig;
struct SendContext : public RPCTarget::IVersionHandler {
vespalib::Lock _lock;
RPCNetwork &_net;
@@ -51,24 +58,26 @@ private:
void PerformTask() override;
};
- typedef std::map<vespalib::VersionSpecification, RPCSendAdapter*> SendAdapterMap;
+ using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
INetworkOwner *_owner;
Identity _ident;
- FastOS_ThreadPool _threadPool;
- FNET_Transport _transport;
- FRT_Supervisor _orb;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
FNET_Scheduler &_scheduler;
- RPCTargetPool _targetPool;
+ std::unique_ptr<RPCTargetPool> _targetPool;
TargetPoolTask _targetPoolTask;
- RPCServicePool _servicePool;
- slobrok::ConfiguratorFactory _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- OOSManager _oosManager;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ std::unique_ptr<OOSManager> _oosManager;
int _requestedPort;
- RPCSendV1 _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -159,7 +168,7 @@ public:
*
* @return port number
**/
- int getPort() const { return _orb.GetListenPort(); }
+ int getPort() const;
/**
* Allocate a new rpc request object. The caller of this method gets the
@@ -191,7 +200,7 @@ public:
*
* @return internal OOS manager
**/
- OOSManager &getOOSManager() { return _oosManager; }
+ OOSManager &getOOSManager() { return *_oosManager; }
/**
* Obtain a reference to the internal supervisor. This is used by
@@ -199,7 +208,7 @@ public:
*
* @return The supervisor.
*/
- FRT_Supervisor &getSupervisor() { return _orb; }
+ FRT_Supervisor &getSupervisor() { return *_orb; }
/**
* Deliver an error reply to the recipients of a {@link SendContext} in a
@@ -225,7 +234,7 @@ public:
void shutdown() override;
void postShutdownHook() override;
const slobrok::api::IMirrorAPI &getMirror() const override;
-
+ CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
};
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index e025db0e350..df35d51cb54 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -11,7 +11,8 @@ RPCNetworkParams::RPCNetworkParams() :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _connectionExpireSecs(30)
+ _connectionExpireSecs(30),
+ _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
RPCNetworkParams::~RPCNetworkParams() {}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index a65248f7299..bfc624a6523 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -3,6 +3,7 @@
#include "identity.h"
#include <vespa/slobrok/cfg.h>
+#include <vespa/vespalib/util/compressionconfig.h>
namespace mbus {
@@ -12,13 +13,15 @@ namespace mbus {
*/
class RPCNetworkParams {
private:
- Identity _identity;
+ using CompressionConfig = vespalib::compression::CompressionConfig;
+ Identity _identity;
config::ConfigUri _slobrokConfig;
- string _oosServerPattern;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- double _connectionExpireSecs;
+ string _oosServerPattern;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
public:
RPCNetworkParams();
@@ -177,6 +180,12 @@ public:
_maxOutputBufferSize = maxOutputBufferSize;
return *this;
}
+
+ RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) {
+ _compressionConfig = compressionConfig;
+ return *this;
+ }
+ CompressionConfig getCompressionConfig() const { return _compressionConfig; }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
new file mode 100644
index 00000000000..588d5a006dd
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -0,0 +1,281 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpcsend.h"
+#include "rpcsend_private.h"
+#include "rpcserviceaddress.h"
+#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/tracelevel.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/frt/reflection.h>
+
+#include <vespa/vespalib/data/slime/cursor.h>
+
+using vespalib::make_string;
+
+namespace mbus {
+
+using network::internal::ReplyContext;
+using network::internal::SendContext;
+
+namespace {
+
+class FillByCopy final : public PayLoadFiller
+{
+public:
+ FillByCopy(BlobRef payload) : _payload(payload) { }
+ void fill(FRT_Values & v) const override {
+ v.AddData(_payload.data(), _payload.size());
+ }
+ void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
+ v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
+ }
+private:
+ BlobRef _payload;
+};
+
+class FillByHandover final : public PayLoadFiller
+{
+public:
+ FillByHandover(Blob payload) : _payload(std::move(payload)) { }
+ void fill(FRT_Values & v) const override {
+ v.AddData(std::move(_payload.payload()), _payload.size());
+ }
+ void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
+ v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
+ }
+private:
+ mutable Blob _payload;
+};
+
+}
+
+RPCSend::RPCSend() :
+ _net(NULL),
+ _clientIdent("client"),
+ _serverIdent("server")
+{ }
+
+RPCSend::~RPCSend() {}
+
+void
+RPCSend::attach(RPCNetwork &net)
+{
+ _net = &net;
+ const string &prefix = _net->getIdentity().getServicePrefix();
+ if (!prefix.empty()) {
+ _clientIdent = make_string("'%s'", prefix.c_str());
+ _serverIdent = _clientIdent;
+ }
+
+ FRT_ReflectionBuilder builder(&_net->getSupervisor());
+ build(builder);
+}
+
+void
+RPCSend::replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err)
+{
+ Reply::UP reply(new EmptyReply());
+ reply->setContext(Context(new ReplyContext(*req, version)));
+ reply->getTrace().setLevel(traceLevel);
+ reply->addError(err);
+ handleReply(std::move(reply));
+}
+
+void
+RPCSend::handleDiscard(Context ctx)
+{
+ ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
+ FRT_RPCRequest &req = tmp->getRequest();
+ FNET_Channel *chn = req.GetContext()._value.CHANNEL;
+ req.SubRef();
+ chn->Free();
+}
+
+void
+RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining)
+{
+ send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
+}
+
+void
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining)
+{
+ send(recipient, version, FillByCopy(payload), timeRemaining);
+}
+
+void
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & payload, uint64_t timeRemaining)
+{
+ SendContext::UP ctx(new SendContext(recipient, timeRemaining));
+ RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
+ const Message &msg = recipient.getMessage();
+ Route route = recipient.getRoute();
+ Hop hop = route.removeHop(0);
+
+ FRT_RPCRequest *req = _net->allocRequest();
+ encodeRequest(*req, version, route, address, msg, recipient.getTrace().getLevel(), payload, timeRemaining);
+
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
+ version.toString().c_str(), _clientIdent.c_str(),
+ address.getServiceName().c_str(), ctx->getTimeout()));
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getFRTTarget().InvokeVoid(req);
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
+ }
+ Reply::UP reply(new EmptyReply());
+ reply->getTrace().swap(ctx->getTrace());
+ _net->getOwner().deliverReply(std::move(reply), recipient);
+ } else {
+ SendContext *ptr = ctx.release();
+ req->SetContext(FNET_Context(ptr));
+ address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
+ }
+}
+
+void
+RPCSend::RequestDone(FRT_RPCRequest *req)
+{
+ SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
+ const string &serviceName = static_cast<RPCServiceAddress&>(ctx->getRecipient().getServiceAddress()).getServiceName();
+ Reply::UP reply;
+ Error error;
+ Trace & trace = ctx->getTrace();
+ if (!req->CheckReturnTypes(getReturnSpec())) {
+ reply.reset(new EmptyReply());
+ switch (req->GetErrorCode()) {
+ case FRTE_RPC_TIMEOUT:
+ error = Error(ErrorCode::TIMEOUT,
+ make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
+ serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
+ break;
+ case FRTE_RPC_CONNECTION:
+ error = Error(ErrorCode::CONNECTION_ERROR,
+ make_string("A connection error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ break;
+ default:
+ error = Error(ErrorCode::NETWORK_ERROR,
+ make_string("A network error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ }
+ } else {
+ FRT_Values &ret = *req->GetReturn();
+ reply = createReply(ret, serviceName, error, trace.getRoot());
+ }
+ if (trace.shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ trace.trace(TraceLevel::SEND_RECEIVE,
+ make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
+ }
+ reply->getTrace().swap(trace);
+ if (error.getCode() != ErrorCode::NONE) {
+ reply->addError(error);
+ }
+ _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
+ req->SubRef();
+}
+
+std::unique_ptr<Reply>
+RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & version, BlobRef payload, Error & error) const
+{
+ Reply::UP reply;
+ IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
+ if (protocol != nullptr) {
+ Routable::UP routable = protocol->decode(version, payload);
+ if (routable) {
+ if (routable->isReply()) {
+ reply.reset(static_cast<Reply*>(routable.release()));
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR, "Payload decoded to a message when expecting a reply.");
+ }
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", protocolName.c_str()));
+ }
+
+ } else {
+ error = Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", protocolName.c_str(), _serverIdent.c_str()));
+ }
+ return reply;
+}
+
+void
+RPCSend::handleReply(Reply::UP reply)
+{
+ ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
+ FRT_RPCRequest &req = ctx->getRequest();
+ string version = ctx->getVersion().toString();
+ if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
+ version.c_str(), _serverIdent.c_str()));
+ }
+ Blob payload(0);
+ if (reply->getType() != 0) {
+ payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
+ if (payload.size() == 0) {
+ reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
+ }
+ }
+ FRT_Values &ret = *req.GetReturn();
+ createResponse(ret, version, *reply, std::move(payload));
+ req.Return();
+}
+
+void
+RPCSend::invoke(FRT_RPCRequest *req)
+{
+ req->Detach();
+ FRT_Values &args = *req->GetParams();
+
+ std::unique_ptr<Params> params = toParams(args);
+ IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
+ if (protocol == nullptr) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str())));
+ return;
+ }
+ Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
+ req->DiscardBlobs();
+ if ( ! routable ) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", params->getProtocol().c_str())));
+ return;
+ }
+ if (routable->isReply()) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::DECODE_ERROR, "Payload decoded to a reply when expecting a mesage."));
+ return;
+ }
+ Message::UP msg(static_cast<Message*>(routable.release()));
+ vespalib::stringref route = params->getRoute();
+ if (!route.empty()) {
+ msg->setRoute(Route::parse(route));
+ }
+ msg->setContext(Context(new ReplyContext(*req, params->getVersion())));
+ msg->pushHandler(*this, *this);
+ msg->setRetryEnabled(params->useRetry());
+ msg->setRetry(params->getRetries());
+ msg->setTimeReceivedNow();
+ msg->setTimeRemaining(params->getRemainingTime());
+ msg->getTrace().setLevel(params->getTraceLevel());
+ if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Message (type %d) received at %s for session '%s'.",
+ msg->getType(), _serverIdent.c_str(), string(params->getSession()).c_str()));
+ }
+ _net->getOwner().deliverMessage(std::move(msg), params->getSession());
+}
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
new file mode 100644
index 00000000000..c707b47f548
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -0,0 +1,95 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpcsendadapter.h"
+#include <vespa/messagebus/idiscardhandler.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/messagebus/common.h>
+#include <vespa/fnet/frt/invokable.h>
+#include <vespa/fnet/frt/invoker.h>
+
+class FRT_ReflectionBuilder;
+
+namespace vespalib::slime { class Cursor; }
+namespace vespalib { class Memory; }
+namespace vespalib { class TraceNode; }
+namespace mbus {
+
+class Error;
+class Route;
+class Message;
+class RPCServiceAddress;
+
+class PayLoadFiller
+{
+public:
+ virtual ~PayLoadFiller() { }
+ virtual void fill(FRT_Values & v) const = 0;
+ virtual void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const = 0;
+};
+
+class RPCSend : public RPCSendAdapter,
+ public FRT_Invokable,
+ public FRT_IRequestWait,
+ public IDiscardHandler,
+ public IReplyHandler
+{
+public:
+ class Params {
+ public:
+ virtual ~Params() {}
+ virtual vespalib::Version getVersion() const = 0;
+ virtual vespalib::stringref getProtocol() const = 0;
+ virtual uint32_t getTraceLevel() const = 0;
+ virtual bool useRetry() const = 0;
+ virtual uint32_t getRetries() const = 0;
+ virtual uint64_t getRemainingTime() const = 0;
+ virtual vespalib::stringref getRoute() const = 0;
+ virtual vespalib::stringref getSession() const = 0;
+ virtual BlobRef getPayload() const = 0;
+ };
+protected:
+ RPCNetwork *_net;
+ string _clientIdent;
+ string _serverIdent;
+
+ virtual void build(FRT_ReflectionBuilder & builder) = 0;
+ virtual std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const = 0;
+ virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const = 0;
+ virtual const char * getReturnSpec() const = 0;
+ virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0;
+ virtual std::unique_ptr<Params> toParams(const FRT_Values &param) const = 0;
+
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & filler, uint64_t timeRemaining);
+ std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version,
+ BlobRef payload, Error & error) const;
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The FRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ void replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err);
+public:
+ RPCSend();
+ ~RPCSend();
+
+ void invoke(FRT_RPCRequest *req);
+private:
+ void attach(RPCNetwork &net) final override;
+ void handleDiscard(Context ctx) final override;
+ void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
+ Blob payload, uint64_t timeRemaining) final override;
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ BlobRef payload, uint64_t timeRemaining) final override;
+ void RequestDone(FRT_RPCRequest *req) final override;
+ void handleReply(std::unique_ptr<Reply> reply) final override;
+};
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
new file mode 100644
index 00000000000..f5867e79856
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
@@ -0,0 +1,52 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/messagebus/trace.h>
+#include <vespa/messagebus/routing/routingnode.h>
+
+namespace mbus::network::internal {
+/**
+ * Implements a helper class to hold the necessary context to create a reply from
+ * an rpc return value. This object is held as the context of an FRT_RPCRequest.
+ */
+class SendContext {
+private:
+ mbus::RoutingNode &_recipient;
+ mbus::Trace _trace;
+ double _timeout;
+
+public:
+ typedef std::unique_ptr<SendContext> UP;
+ SendContext(const SendContext &) = delete;
+ SendContext & operator = (const SendContext &) = delete;
+ SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
+ : _recipient(recipient),
+ _trace(recipient.getTrace().getLevel()),
+ _timeout(timeRemaining * 0.001) { }
+ mbus::RoutingNode &getRecipient() { return _recipient; }
+ mbus::Trace &getTrace() { return _trace; }
+ double getTimeout() { return _timeout; }
+};
+
+/**
+ * Implements a helper class to hold the necessary context to send a reply as an
+ * rpc return value. This object is held in the callstack of the reply.
+ */
+class ReplyContext {
+private:
+ FRT_RPCRequest &_request;
+ vespalib::Version _version;
+
+public:
+ typedef std::unique_ptr<ReplyContext> UP;
+ ReplyContext(const ReplyContext &) = delete;
+ ReplyContext & operator = (const ReplyContext &) = delete;
+
+ ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
+ : _request(request), _version(version) { }
+ FRT_RPCRequest &getRequest() { return _request; }
+ const vespalib::Version &getVersion() { return _version; }
+};
+
+
+}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
index 4cf45207010..6b89a278b88 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
@@ -1,87 +1,40 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "rpcsendv1.h"
#include "rpcnetwork.h"
-#include <vespa/messagebus/routing/routingnode.h>
+#include "rpcserviceaddress.h"
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/tracelevel.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/channel.h>
+#include <vespa/fnet/frt/reflection.h>
using vespalib::make_string;
-namespace {
-
-/**
- * Implements a helper class to hold the necessary context to create a reply from
- * an rpc return value. This object is held as the context of an FRT_RPCRequest.
- */
-class SendContext {
-private:
- mbus::RoutingNode &_recipient;
- mbus::Trace _trace;
- double _timeout;
-
-public:
- typedef std::unique_ptr<SendContext> UP;
- SendContext(const SendContext &) = delete;
- SendContext & operator = (const SendContext &) = delete;
- SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
- : _recipient(recipient),
- _trace(recipient.getTrace().getLevel()),
- _timeout(timeRemaining * 0.001) { }
- mbus::RoutingNode &getRecipient() { return _recipient; }
- mbus::Trace &getTrace() { return _trace; }
- double getTimeout() { return _timeout; }
-};
+namespace mbus {
-/**
- * Implements a helper class to hold the necessary context to send a reply as an
- * rpc return value. This object is held in the callstack of the reply.
- */
-class ReplyContext {
-private:
- FRT_RPCRequest &_request;
- vespalib::Version _version;
+namespace {
-public:
- typedef std::unique_ptr<ReplyContext> UP;
- ReplyContext(const ReplyContext &) = delete;
- ReplyContext & operator = (const ReplyContext &) = delete;
-
- ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
- : _request(request), _version(version) { }
- FRT_RPCRequest &getRequest() { return _request; }
- const vespalib::Version &getVersion() { return _version; }
-};
+const char *METHOD_NAME = "mbus.send1";
+const char *METHOD_PARAMS = "sssbilsxi";
+const char *METHOD_RETURN = "sdISSsxs";
}
-namespace mbus {
-
-const char *RPCSendV1::METHOD_NAME = "mbus.send1";
-const char *RPCSendV1::METHOD_PARAMS = "sssbilsxi";
-const char *RPCSendV1::METHOD_RETURN = "sdISSsxs";
-
-RPCSendV1::RPCSendV1() :
- _net(NULL),
- _clientIdent("client"),
- _serverIdent("server")
-{ }
+bool RPCSendV1::isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons)
+{
+ return (method == METHOD_NAME) &&
+ (request == METHOD_PARAMS) &&
+ (respons == METHOD_RETURN);
+}
-RPCSendV1::~RPCSendV1() {}
+const char *
+RPCSendV1::getReturnSpec() const {
+ return METHOD_RETURN;
+}
void
-RPCSendV1::attach(RPCNetwork &net)
+RPCSendV1::build(FRT_ReflectionBuilder & builder)
{
- _net = &net;
- const string &prefix = _net->getIdentity().getServicePrefix();
- if (!prefix.empty()) {
- _clientIdent = make_string("'%s'", prefix.c_str());
- _serverIdent = _clientIdent;
- }
-
- FRT_ReflectionBuilder builder(&_net->getSupervisor());
builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV1::invoke), this);
builder.MethodDesc("Send a message bus request and get a reply back.");
builder.ParamDesc("version", "The version of the message.");
@@ -103,59 +56,14 @@ RPCSendV1::attach(RPCNetwork &net)
builder.ReturnDesc("trace", "A string representation of the trace.");
}
-namespace {
-
-class FillByCopy : public PayLoadFiller
-{
-public:
- FillByCopy(BlobRef payload) : _payload(payload) { }
- void fill(FRT_Values & v) const override {
- v.AddData(_payload.data(), _payload.size());
- }
-private:
- BlobRef _payload;
-};
-
-class FillByHandover : public PayLoadFiller
-{
-public:
- FillByHandover(Blob payload) : _payload(std::move(payload)) { }
- void fill(FRT_Values & v) const override {
- v.AddData(std::move(_payload.payload()), _payload.size());
- }
-private:
- mutable Blob _payload;
-};
-
-}
-
void
-RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining)
+RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const
{
- send(recipient, version, FillByCopy(payload), timeRemaining);
-}
-
-void
-RPCSendV1::sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining)
-{
- send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
-}
-void
-RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & payload, uint64_t timeRemaining)
-{
- SendContext::UP ctx(new SendContext(recipient, timeRemaining));
- RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
- const Message &msg = recipient.getMessage();
- Route route = recipient.getRoute();
- Hop hop = route.removeHop(0);
-
- FRT_RPCRequest *req = _net->allocRequest();
- FRT_Values &args = *req->GetParams();
- req->SetMethodName(METHOD_NAME);
+ FRT_Values &args = *req.GetParams();
+ req.SetMethodName(METHOD_NAME);
args.AddString(version.toString().c_str());
args.AddString(route.toString().c_str());
args.AddString(address.getSessionName().c_str());
@@ -163,237 +71,103 @@ RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
args.AddInt32(msg.getRetry());
args.AddInt64(timeRemaining);
args.AddString(msg.getProtocol().c_str());
- payload.fill(args);
- args.AddInt32(recipient.getTrace().getLevel());
-
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
- version.toString().c_str(), _clientIdent.c_str(),
- address.getServiceName().c_str(), ctx->getTimeout()));
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getFRTTarget().InvokeVoid(req);
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
- }
- Reply::UP reply(new EmptyReply());
- reply->getTrace().swap(ctx->getTrace());
- _net->getOwner().deliverReply(std::move(reply), recipient);
- } else {
- SendContext *ptr = ctx.release();
- req->SetContext(FNET_Context(ptr));
- address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
- }
+ filler.fill(args);
+ args.AddInt32(traceLevel);
}
-void
-RPCSendV1::RequestDone(FRT_RPCRequest *req)
-{
- SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
- const string &serviceName = static_cast<RPCServiceAddress&>(
- ctx->getRecipient().getServiceAddress()).getServiceName();
- Reply::UP reply;
- Error error;
- if (!req->CheckReturnTypes(METHOD_RETURN)) {
- reply.reset(new EmptyReply());
- switch (req->GetErrorCode()) {
- case FRTE_RPC_TIMEOUT:
- error = Error(ErrorCode::TIMEOUT,
- make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
- serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
- break;
- case FRTE_RPC_CONNECTION:
- error = Error(ErrorCode::CONNECTION_ERROR,
- make_string("A connection error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- break;
- default:
- error = Error(ErrorCode::NETWORK_ERROR,
- make_string("A network error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- }
- } else {
- FRT_Values &ret = *req->GetReturn();
+namespace {
- vespalib::Version version = vespalib::Version(ret[0]._string._str);
- double retryDelay = ret[1]._double;
- uint32_t *errorCodes = ret[2]._int32_array._pt;
- uint32_t errorCodesLen = ret[2]._int32_array._len;
- FRT_StringValue *errorMessages = ret[3]._string_array._pt;
- uint32_t errorMessagesLen = ret[3]._string_array._len;
- FRT_StringValue *errorServices = ret[4]._string_array._pt;
- uint32_t errorServicesLen = ret[4]._string_array._len;
- const char *protocolName = ret[5]._string._str;
- const char *payload = ret[6]._data._buf;
- uint32_t payloadLen = ret[6]._data._len;
- const char *trace = ret[7]._string._str;
+class ParamsV1 : public RPCSend::Params
+{
+public:
+ ParamsV1(const FRT_Values &args) : _args(args) { }
- if (payloadLen > 0) {
- IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
- if (protocol != nullptr) {
- Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
- if (routable) {
- if (routable->isReply()) {
- reply.reset(static_cast<Reply*>(routable.release()));
- } else {
- error = Error(ErrorCode::DECODE_ERROR,
- "Payload decoded to a message when expecting a reply.");
- }
- } else {
- error = Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", protocolName));
- }
+ uint32_t getTraceLevel() const override { return _args[8]._intval32; }
+ bool useRetry() const override { return _args[3]._intval8 != 0; }
+ uint32_t getRetries() const override { return _args[4]._intval32; }
+ uint64_t getRemainingTime() const override { return _args[5]._intval64; }
- } else {
- error = Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str()));
- }
- }
- if ( ! reply ) {
- reply.reset(new EmptyReply());
- }
- reply->setRetryDelay(retryDelay);
- for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
- reply->addError(Error(errorCodes[i],
- errorMessages[i]._str,
- errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
- }
- ctx->getTrace().getRoot().addChild(TraceNode::decode(trace));
+ vespalib::Version getVersion() const override {
+ return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len));
+ }
+ vespalib::stringref getRoute() const override {
+ return vespalib::stringref(_args[1]._string._str, _args[1]._string._len);
+ }
+ vespalib::stringref getSession() const override {
+ return vespalib::stringref(_args[2]._string._str, _args[2]._string._len);
}
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
+ vespalib::stringref getProtocol() const override {
+ return vespalib::stringref(_args[6]._string._str, _args[6]._string._len);
}
- reply->getTrace().swap(ctx->getTrace());
- if (error.getCode() != ErrorCode::NONE) {
- reply->addError(error);
+ BlobRef getPayload() const override {
+ return BlobRef(_args[7]._data._buf, _args[7]._data._len);
}
- _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
- req->SubRef();
+private:
+ const FRT_Values & _args;
+};
+
}
-void
-RPCSendV1::invoke(FRT_RPCRequest *req)
+std::unique_ptr<RPCSend::Params>
+RPCSendV1::toParams(const FRT_Values &args) const
{
- req->Detach();
+ return std::make_unique<ParamsV1>(args);
+}
- FRT_Values &args = *req->GetParams();
- vespalib::Version version = vespalib::Version(args[0]._string._str);
- const char *route = args[1]._string._str;
- const char *session = args[2]._string._str;
- bool retryEnabled = args[3]._intval8 != 0;
- uint32_t retry = args[4]._intval32;
- uint64_t timeRemaining = args[5]._intval64;
- const char *protocolName = args[6]._string._str;
- const char *payload = args[7]._data._buf;
- uint32_t payloadLen = args[7]._data._len;
- uint32_t traceLevel = args[8]._intval32;
- IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
- if (protocol == nullptr) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str())));
- return;
- }
- Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
- req->DiscardBlobs();
- if ( ! routable ) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", protocolName)));
- return;
- }
- if (routable->isReply()) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::DECODE_ERROR,
- "Payload decoded to a reply when expecting a mesage."));
- return;
+std::unique_ptr<Reply>
+RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const
+{
+ vespalib::Version version = vespalib::Version(ret[0]._string._str);
+ double retryDelay = ret[1]._double;
+ uint32_t *errorCodes = ret[2]._int32_array._pt;
+ uint32_t errorCodesLen = ret[2]._int32_array._len;
+ FRT_StringValue *errorMessages = ret[3]._string_array._pt;
+ uint32_t errorMessagesLen = ret[3]._string_array._len;
+ FRT_StringValue *errorServices = ret[4]._string_array._pt;
+ uint32_t errorServicesLen = ret[4]._string_array._len;
+ const char *protocolName = ret[5]._string._str;
+ BlobRef payload(ret[6]._data._buf, ret[6]._data._len);
+ const char *trace = ret[7]._string._str;
+
+ Reply::UP reply;
+ if (payload.size() > 0) {
+ reply = decode(protocolName, version, payload, error);
}
- Message::UP msg(static_cast<Message*>(routable.release()));
- if (strlen(route) > 0) {
- msg->setRoute(Route::parse(route));
+ if ( ! reply ) {
+ reply.reset(new EmptyReply());
}
- msg->setContext(Context(new ReplyContext(*req, version)));
- msg->pushHandler(*this, *this);
- msg->setRetryEnabled(retryEnabled);
- msg->setRetry(retry);
- msg->setTimeReceivedNow();
- msg->setTimeRemaining(timeRemaining);
- msg->getTrace().setLevel(traceLevel);
- if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Message (type %d) received at %s for session '%s'.",
- msg->getType(), _serverIdent.c_str(), session));
+ reply->setRetryDelay(retryDelay);
+ for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
+ reply->addError(Error(errorCodes[i], errorMessages[i]._str,
+ errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
}
- _net->getOwner().deliverMessage(std::move(msg), session);
+ rootTrace.addChild(TraceNode::decode(trace));
+ return reply;
}
void
-RPCSendV1::handleReply(Reply::UP reply)
-{
- ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
- FRT_RPCRequest &req = ctx->getRequest();
- string version = ctx->getVersion().toString();
- if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
- version.c_str(), _serverIdent.c_str()));
- }
- Blob payload(0);
- if (reply->getType() != 0) {
- payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
- if (payload.size() == 0) {
- reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
- }
- }
- FRT_Values &ret = *req.GetReturn();
+RPCSendV1::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const {
ret.AddString(version.c_str());
- ret.AddDouble(reply->getRetryDelay());
+ ret.AddDouble(reply.getRetryDelay());
- uint32_t errorCount = reply->getNumErrors();
+ uint32_t errorCount = reply.getNumErrors();
uint32_t *errorCodes = ret.AddInt32Array(errorCount);
FRT_StringValue *errorMessages = ret.AddStringArray(errorCount);
FRT_StringValue *errorServices = ret.AddStringArray(errorCount);
for (uint32_t i = 0; i < errorCount; ++i) {
- errorCodes[i] = reply->getError(i).getCode();
- ret.SetString(errorMessages + i,
- reply->getError(i).getMessage().c_str());
- ret.SetString(errorServices + i,
- reply->getError(i).getService().c_str());
+ errorCodes[i] = reply.getError(i).getCode();
+ ret.SetString(errorMessages + i, reply.getError(i).getMessage().c_str());
+ ret.SetString(errorServices + i, reply.getError(i).getService().c_str());
}
- ret.AddString(reply->getProtocol().c_str());
+ ret.AddString(reply.getProtocol().c_str());
ret.AddData(std::move(payload.payload()), payload.size());
- if (reply->getTrace().getLevel() > 0) {
- ret.AddString(reply->getTrace().getRoot().encode().c_str());
+ if (reply.getTrace().getLevel() > 0) {
+ ret.AddString(reply.getTrace().getRoot().encode().c_str());
} else {
ret.AddString("");
}
- req.Return();
-}
-
-void
-RPCSendV1::handleDiscard(Context ctx)
-{
- ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
- FRT_RPCRequest &req = tmp->getRequest();
- FNET_Channel *chn = req.GetContext()._value.CHANNEL;
- req.SubRef();
- chn->Free();
-}
-
-void
-RPCSendV1::replyError(FRT_RPCRequest *req, const vespalib::Version &version,
- uint32_t traceLevel, const Error &err)
-{
- Reply::UP reply(new EmptyReply());
- reply->setContext(Context(new ReplyContext(*req, version)));
- reply->getTrace().setLevel(traceLevel);
- reply->addError(err);
- handleReply(std::move(reply));
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
index b634c1ef1b8..37f23335309 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
@@ -1,77 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "rpcsendadapter.h"
-#include <vespa/messagebus/idiscardhandler.h>
-#include <vespa/messagebus/ireplyhandler.h>
-#include <vespa/fnet/frt/invokable.h>
-
+#include "rpcsend.h"
namespace mbus {
-class Error;
-
-class PayLoadFiller
-{
+class RPCSendV1 : public RPCSend {
public:
- virtual ~PayLoadFiller() { }
- virtual void fill(FRT_Values & v) const = 0;
-};
-
-/**
- * Implements the send adapter for method "mbus.send".
- */
-class RPCSendV1 : public RPCSendAdapter,
- public FRT_Invokable,
- public FRT_IRequestWait,
- public IDiscardHandler,
- public IReplyHandler {
+ static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons);
private:
- RPCNetwork *_net;
- string _clientIdent;
- string _serverIdent;
-
- /**
- * Send an error reply for a given request.
- *
- * @param request The FRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- void replyError(FRT_RPCRequest *req, const vespalib::Version &version,
- uint32_t traceLevel, const Error &err);
-
- void send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & filler, uint64_t timeRemaining);
-public:
- /** The name of the rpc method that this adapter registers. */
- static const char *METHOD_NAME;
-
- /** The parameter string of the rpc method. */
- static const char *METHOD_PARAMS;
-
- /** The return string of the rpc method. */
- static const char *METHOD_RETURN;
-
- /**
- * Constructs a new instance of this adapter. This object is unusable until
- * its attach() method has been called.
- */
- RPCSendV1();
- ~RPCSendV1();
-
- void attach(RPCNetwork &net) override;
-
- void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining) override;
- void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining) override;
-
- void handleReply(std::unique_ptr<Reply> reply) override;
- void handleDiscard(Context ctx) override;
- void invoke(FRT_RPCRequest *req);
- void RequestDone(FRT_RPCRequest *req) override;
+ void build(FRT_ReflectionBuilder & builder) override;
+ const char * getReturnSpec() const override;
+ std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
+ void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+
+ std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const override;
+ void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
new file mode 100644
index 00000000000..bd58e30c6d1
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
@@ -0,0 +1,247 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpcsendv2.h"
+#include "rpcnetwork.h"
+#include "rpcserviceaddress.h"
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/tracelevel.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/util/compressor.h>
+#include <vespa/fnet/frt/reflection.h>
+
+using vespalib::make_string;
+using vespalib::compression::CompressionConfig;
+using vespalib::compression::decompress;
+using vespalib::compression::compress;
+using vespalib::DataBuffer;
+using vespalib::ConstBufferRef;
+using vespalib::stringref;
+using vespalib::Memory;
+using vespalib::Slime;
+using vespalib::Version;
+using namespace vespalib::slime;
+
+namespace mbus {
+
+namespace {
+
+const char *METHOD_NAME = "mbus.slime";
+const char *METHOD_PARAMS = "bix";
+const char *METHOD_RETURN = "bix";
+
+Memory VERSION_F("version");
+Memory ROUTE_F("route");
+Memory SESSION_F("session");
+Memory USERETRY_F("useretry");
+Memory RETRYDELAY_F("retrydelay");
+Memory RETRY_F("retry");
+Memory TIMELEFT_F("timeleft");
+Memory PROTOCOL_F("prot");
+Memory TRACELEVEL_F("tracelevel");
+Memory TRACE_F("trace");
+Memory BLOB_F("msg");
+Memory ERRORS_F("errors");
+Memory CODE_F("code");
+Memory MSG_F("msg");
+Memory SERVICE_F("service");
+
+
+}
+
+bool RPCSendV2::isCompatible(stringref method, stringref request, stringref respons)
+{
+ return (method == METHOD_NAME) &&
+ (request == METHOD_PARAMS) &&
+ (respons == METHOD_RETURN);
+}
+
+void
+RPCSendV2::build(FRT_ReflectionBuilder & builder)
+{
+ builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV2::invoke), this);
+ builder.MethodDesc("Send a message bus slime request and get a reply back.");
+ builder.ParamDesc("encoding", "0=raw, 6=lz4");
+ builder.ParamDesc("uncompressedBlobSize", "Uncompressed blob size");
+ builder.ParamDesc("message", "The message blob in slime");
+ builder.ReturnDesc("encoding", "0=raw, 6=lz4");
+ builder.ReturnDesc("uncompressedBlobSize", "Uncompressed blob size");
+ builder.ReturnDesc("reply", "The reply blob in slime.");
+}
+
+const char *
+RPCSendV2::getReturnSpec() const {
+ return METHOD_RETURN;
+}
+
+namespace {
+class OutputBuf : public vespalib::Output {
+public:
+ OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
+ DataBuffer & getBuf() { return _buf; }
+private:
+ vespalib::WritableMemory reserve(size_t bytes) override {
+ _buf.ensureFree(bytes);
+ return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
+ }
+ Output &commit(size_t bytes) override {
+ _buf.moveFreeToData(bytes);
+ return *this;
+ }
+ DataBuffer _buf;
+};
+}
+
+void
+RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const
+{
+ Slime slime;
+ Cursor & root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMELEFT_F, timeRemaining);
+ root.setString(PROTOCOL_F, msg.getProtocol());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ filler.fill(BLOB_F, root);
+
+ OutputBuf rBuf(8192);
+ BinaryFormat::encode(slime, rBuf);
+ ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
+ DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
+ CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
+
+ FRT_Values &args = *req.GetParams();
+ req.SetMethodName(METHOD_NAME);
+ args.AddInt8(type);
+ args.AddInt32(toCompress.size());
+ args.AddData(buf.stealBuffer(), buf.getDataLen());
+}
+
+namespace {
+
+class ParamsV2 : public RPCSend::Params
+{
+public:
+ ParamsV2(const FRT_Values &arg)
+ : _slime()
+ {
+ uint8_t encoding = arg[0]._intval8;
+ uint32_t uncompressedSize = arg[1]._intval32;
+ DataBuffer uncompressed(arg[2]._data._buf, arg[2]._data._len);
+ ConstBufferRef blob(arg[2]._data._buf, arg[2]._data._len);
+ decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
+ assert(uncompressedSize == uncompressed.getDataLen());
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), _slime);
+ }
+
+ uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); }
+ bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); }
+ uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); }
+ uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); }
+
+ Version getVersion() const override {
+ return Version(_slime.get()[VERSION_F].asString().make_stringref());
+ }
+ stringref getRoute() const override {
+ return _slime.get()[ROUTE_F].asString().make_stringref();
+ }
+ stringref getSession() const override {
+ return _slime.get()[SESSION_F].asString().make_stringref();
+ }
+ stringref getProtocol() const override {
+ return _slime.get()[PROTOCOL_F].asString().make_stringref();
+ }
+ BlobRef getPayload() const override {
+ Memory m = _slime.get()[BLOB_F].asData();
+ return BlobRef(m.data, m.size);
+ }
+private:
+ Slime _slime;
+};
+
+}
+
+std::unique_ptr<RPCSend::Params>
+RPCSendV2::toParams(const FRT_Values &args) const
+{
+ return std::make_unique<ParamsV2>(args);
+}
+
+std::unique_ptr<Reply>
+RPCSendV2::createReply(const FRT_Values & ret, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const
+{
+ uint8_t encoding = ret[0]._intval8;
+ uint32_t uncompressedSize = ret[1]._intval32;
+ DataBuffer uncompressed(ret[2]._data._buf, ret[2]._data._len);
+ ConstBufferRef blob(ret[2]._data._buf, ret[2]._data._len);
+ decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
+ assert(uncompressedSize == uncompressed.getDataLen());
+ Slime slime;
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
+ Inspector & root = slime.get();
+ Version version(root[VERSION_F].asString().make_string());
+ Memory payload = root[BLOB_F].asData();
+
+ Reply::UP reply;
+ if (payload.size > 0) {
+ reply = decode(root[PROTOCOL_F].asString().make_stringref(), version, BlobRef(payload.data, payload.size), error);
+ }
+ if ( ! reply ) {
+ reply.reset(new EmptyReply());
+ }
+ reply->setRetryDelay(root[RETRYDELAY_F].asDouble());
+ Inspector & errors = root[ERRORS_F];
+ for (uint32_t i = 0; i < errors.entries(); ++i) {
+ Inspector & e = errors[i];
+ Memory service = e[SERVICE_F].asString();
+ reply->addError(Error(e[CODE_F].asLong(), e[MSG_F].asString().make_string(),
+ (service.size > 0) ? service.make_string() : serviceName));
+ }
+ rootTrace.addChild(TraceNode::decode(root[TRACE_F].asString().make_string()));
+ return reply;
+}
+
+void
+RPCSendV2::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const
+{
+ Slime slime;
+ Cursor & root = slime.setObject();
+
+ root.setString(VERSION_F, version);
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol());
+ root.setData(BLOB_F, vespalib::Memory(payload.data(), payload.size()));
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor & array = root.setArray(ERRORS_F);
+ for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
+ Cursor & error = array.addObject();
+ error.setLong(CODE_F, reply.getError(i).getCode());
+ error.setString(MSG_F, reply.getError(i).getMessage());
+ error.setString(SERVICE_F, reply.getError(i).getService().c_str());
+ }
+ }
+
+ OutputBuf rBuf(8192);
+ BinaryFormat::encode(slime, rBuf);
+ ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
+ DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
+ CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
+
+ ret.AddInt8(type);
+ ret.AddInt32(toCompress.size());
+ ret.AddData(buf.stealBuffer(), buf.getDataLen());
+}
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
new file mode 100644
index 00000000000..c720270c89e
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
@@ -0,0 +1,24 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpcsend.h"
+
+namespace mbus {
+
+class RPCSendV2 : public RPCSend {
+public:
+ static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons);
+private:
+ void build(FRT_ReflectionBuilder & builder) override;
+ const char * getReturnSpec() const override;
+ std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
+ void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+
+ std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const override;
+ void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
+};
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index 683982de080..5f858f66993 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -1,11 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <map>
-#include <vespa/messagebus/itimer.h>
-#include <vespa/vespalib/util/sync.h>
#include "rpcserviceaddress.h"
#include "rpctarget.h"
+#include <vespa/messagebus/itimer.h>
+#include <vespa/vespalib/util/sync.h>
+#include <map>
class FRT_Supervisor;
diff --git a/messagebus/src/vespa/messagebus/routing/route.cpp b/messagebus/src/vespa/messagebus/routing/route.cpp
index af7b5113ac2..b705847c3a5 100644
--- a/messagebus/src/vespa/messagebus/routing/route.cpp
+++ b/messagebus/src/vespa/messagebus/routing/route.cpp
@@ -69,7 +69,7 @@ Route::toDebugString() const {
}
Route
-Route::parse(const string &route)
+Route::parse(vespalib::stringref route)
{
return RouteParser::createRoute(route);
}
diff --git a/messagebus/src/vespa/messagebus/routing/route.h b/messagebus/src/vespa/messagebus/routing/route.h
index d9932e17d26..a2a01648cfe 100644
--- a/messagebus/src/vespa/messagebus/routing/route.h
+++ b/messagebus/src/vespa/messagebus/routing/route.h
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vector>
#include "hop.h"
namespace mbus {
@@ -33,7 +32,7 @@ public:
* @param route The string to parse.
* @return A route that corresponds to the string.
*/
- static Route parse(const string &route);
+ static Route parse(vespalib::stringref route);
/**
* Create a Route that contains no hops
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.cpp b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
index ac52ae3b598..131ecbe3b33 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
@@ -71,7 +71,7 @@ RouteParser::createDirective(const stringref &str)
}
Hop
-RouteParser::createHop(const stringref &str)
+RouteParser::createHop(stringref str)
{
if (str.empty()) {
return Hop().addDirective(createErrorDirective("Failed to parse empty string."));
@@ -119,7 +119,7 @@ RouteParser::createHop(const stringref &str)
}
Route
-RouteParser::createRoute(const stringref &str)
+RouteParser::createRoute(stringref str)
{
Route ret;
for (size_t from = 0, at = 0, depth = 0; at <= str.size(); ++at) {
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.h b/messagebus/src/vespa/messagebus/routing/routeparser.h
index a3f16e49307..8ffba3f6e11 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.h
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.h
@@ -29,7 +29,7 @@ public:
* @param str The string to parse as a hop.
* @return The created hop.
*/
- static Hop createHop(const vespalib::stringref &str);
+ static Hop createHop(vespalib::stringref str);
/**
* Creates a route from a string representation.
@@ -37,7 +37,7 @@ public:
* @param str The string to parse as a route.
* @return The created route.
*/
- static Route createRoute(const vespalib::stringref &str);
+ static Route createRoute(vespalib::stringref str);
};
} // mbus
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h
index 22ff07c26e5..28c836a6dec 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.h
@@ -348,6 +348,7 @@ public:
* @return The message being routed.
*/
Message &getMessage() { return _msg; }
+ const Message & getMessage() const { return _msg; }
/**
* Returns the trace object for this node. Each node has a separate trace
@@ -356,6 +357,7 @@ public:
* @return The trace object.
*/
Trace &getTrace() { return _trace; }
+ const Trace &getTrace() const { return _trace; }
/**
* Returns the route object as it exists at this point of the tree.
@@ -429,6 +431,7 @@ public:
* @return The recipient address.
*/
IServiceAddress &getServiceAddress() { return *_serviceAddress; }
+ const IServiceAddress &getServiceAddress() const { return *_serviceAddress; }
/**
* Sets the service address of this node. This is called by the network
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
index c093efd1106..103c21ee3aa 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpcmessagebus.h"
+#include <vespa/config/subscription/configuri.h>
namespace mbus {
@@ -15,6 +16,10 @@ RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams,
_subscriber.start();
}
+RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams, const RPCNetworkParams &rpcParams)
+ : RPCMessageBus(mbusParams, rpcParams, config::ConfigUri("client"))
+{}
+
RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols,
const RPCNetworkParams &rpcParams,
const config::ConfigUri &routingCfgUri) :
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.h b/messagebus/src/vespa/messagebus/rpcmessagebus.h
index 3f249965fec..e9cfed32c44 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.h
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.h
@@ -7,6 +7,8 @@
#include <vespa/messagebus/network/rpcnetwork.h>
#include <vespa/config/helper/legacysubscriber.h>
+namespace config {class ConfigUri; }
+
namespace mbus {
/**
@@ -40,8 +42,10 @@ public:
* @param routingCfgId The config id for message bus routing specs.
*/
RPCMessageBus(const MessageBusParams &mbusParams,
- const RPCNetworkParams &rpcParams = RPCNetworkParams(),
- const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
+ const RPCNetworkParams &rpcParams,
+ const config::ConfigUri & routingCfgId);
+ RPCMessageBus(const MessageBusParams &mbusParams,
+ const RPCNetworkParams &rpcParams);
/**
@@ -55,8 +59,8 @@ public:
* @param routingCfgId The config id for messagebus routing specs.
*/
RPCMessageBus(const ProtocolSet &protocols,
- const RPCNetworkParams &rpcParams = RPCNetworkParams(),
- const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
+ const RPCNetworkParams &rpcParams,
+ const config::ConfigUri & routingCfgId);
/**
* Destruct. This will destruct the internal MessageBus and RPCNetwork
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index e7f3646c72c..dbc741f2dd4 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -4,6 +4,7 @@
#include "simpleprotocol.h"
#include "slobrok.h"
#include "slobrokstate.h"
+#include <vespa/messagebus/network/oosmanager.h>
#include <vespa/vespalib/component/vtag.h>
namespace mbus {
@@ -11,9 +12,7 @@ namespace mbus {
VersionedRPCNetwork::VersionedRPCNetwork(const RPCNetworkParams &params) :
RPCNetwork(params),
_version(vespalib::Vtag::currentVersion)
-{
- // empty
-}
+{}
void
VersionedRPCNetwork::setVersion(const vespalib::Version &version)
@@ -97,4 +96,4 @@ TestServer::waitState(const OOSState &oosState)
return false;
}
-} // namespace mbus
+}
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h
index 1e2de3c4607..400e2b274c5 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.h
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.h
@@ -2,9 +2,10 @@
#pragma once
-#include <memory>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/fnet/frt/supervisor.h>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h
index 95de9e70f55..f582a70a151 100644
--- a/messagebus/src/vespa/messagebus/tracenode.h
+++ b/messagebus/src/vespa/messagebus/tracenode.h
@@ -5,7 +5,7 @@
namespace mbus {
- typedef vespalib::TraceNode TraceNode;
+ using TraceNode = vespalib::TraceNode;
} // namespace mbus
diff --git a/messagebus_test/src/tests/error/cpp-client.cpp b/messagebus_test/src/tests/error/cpp-client.cpp
index 001b0b14674..44580400e26 100644
--- a/messagebus_test/src/tests/error/cpp-client.cpp
+++ b/messagebus_test/src/tests/error/cpp-client.cpp
@@ -1,13 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/sourcesession.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/rpcmessagebus.h>
-#include <vespa/messagebus/sourcesessionparams.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/fastos/app.h>
diff --git a/messagebus_test/src/tests/error/cpp-server.cpp b/messagebus_test/src/tests/error/cpp-server.cpp
index 68a1d0afc27..af0d6e30132 100644
--- a/messagebus_test/src/tests/error/cpp-server.cpp
+++ b/messagebus_test/src/tests/error/cpp-server.cpp
@@ -3,6 +3,7 @@
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
#include <vespa/fastos/app.h>
diff --git a/messagebus_test/src/tests/speed/cpp-client.cpp b/messagebus_test/src/tests/speed/cpp-client.cpp
index d510054c0ab..437cc3ce354 100644
--- a/messagebus_test/src/tests/speed/cpp-client.cpp
+++ b/messagebus_test/src/tests/speed/cpp-client.cpp
@@ -3,6 +3,7 @@
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/simplereply.h>
diff --git a/messagebus_test/src/tests/speed/cpp-server.cpp b/messagebus_test/src/tests/speed/cpp-server.cpp
index ba821f19b02..ddbeac62ce2 100644
--- a/messagebus_test/src/tests/speed/cpp-server.cpp
+++ b/messagebus_test/src/tests/speed/cpp-server.cpp
@@ -5,6 +5,7 @@
#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/fastos/app.h>
using namespace mbus;
diff --git a/messagebus_test/src/tests/trace/cpp-server.cpp b/messagebus_test/src/tests/trace/cpp-server.cpp
index 60bd55270a7..c9086ff1758 100644
--- a/messagebus_test/src/tests/trace/cpp-server.cpp
+++ b/messagebus_test/src/tests/trace/cpp-server.cpp
@@ -3,6 +3,7 @@
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/fastos/app.h>
diff --git a/messagebus_test/src/tests/trace/trace.cpp b/messagebus_test/src/tests/trace/trace.cpp
index 96759f286b8..a653c4934b3 100644
--- a/messagebus_test/src/tests/trace/trace.cpp
+++ b/messagebus_test/src/tests/trace/trace.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/log/log.h>
-LOG_SETUP("trace_test");
+
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/messagebus/testlib/slobrok.h>
@@ -21,6 +20,9 @@ LOG_SETUP("trace_test");
#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <iostream>
+#include <vespa/log/log.h>
+LOG_SETUP("trace_test");
+
using namespace mbus;
using vespalib::make_string;
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ae8f3290fef..c90b18038c2 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -404,6 +404,11 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setListenPort(config->mbusport);
}
+ using CompressionConfig = vespalib::compression::CompressionConfig;
+ CompressionConfig::Type compressionType = CompressionConfig::toType(
+ CommunicationManagerConfig::Mbus::Compress::getTypeName(config->mbus.compress.type).c_str());
+ params.setCompressionConfig(CompressionConfig(compressionType, config->mbus.compress.level,
+ 90, config->mbus.compress.limit));
// Configure messagebus here as we for legacy reasons have
// config here.
_mbus = std::make_unique<mbus::RPCMessageBus>(
diff --git a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
index 1a518b4c819..fe4600aea58 100644
--- a/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
+++ b/vespaclient/src/vespa/vespaclient/vesparoute/application.cpp
@@ -4,16 +4,18 @@
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/util/stringutil.h>
#include <vespa/documentapi/messagebus/documentprotocol.h>
#include <vespa/messagebus/configagent.h>
#include <vespa/messagebus/routing/routingtable.h>
#include <vespa/messagebus/routing/routedirective.h>
#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/network/rpcsendv1.h>
+#include <vespa/messagebus/network/rpcsendv2.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/config/common/exceptions.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/frt/supervisor.h>
using config::ConfigGetter;
using document::DocumenttypesConfig;
@@ -528,9 +530,8 @@ Application::isService(FRT_Supervisor &frt, const std::string &spec) const
FRT_StringValue *retList = req->GetReturn()->GetValue(2)._string_array._pt;
for (uint32_t i = 0; i < numMethods; ++i) {
- if (strcmp(methods[i]._str, mbus::RPCSendV1::METHOD_NAME) == 0 &&
- strcmp(argList[i]._str, mbus::RPCSendV1::METHOD_PARAMS) == 0 &&
- strcmp(retList[i]._str, mbus::RPCSendV1::METHOD_RETURN) == 0) {
+ if (mbus::RPCSendV1::isCompatible(methods[i]._str,argList[i]._str, retList[i]._str) ||
+ mbus::RPCSendV2::isCompatible(methods[i]._str,argList[i]._str, retList[i]._str)) {
ret = true;
break;
}