summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-19 20:37:23 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-19 20:40:43 +0200
commit6708c5f296d0c80bb64ad7ba7a2e949969d35753 (patch)
treece309859b08f92dda4f9bce8846e4c324945725d /messagebus
parentc536a7dedb8257d9a56fb350bf24b796e26712e8 (diff)
This add slime encoding of headers for future TLS usage,
and slime encoded body containing normal mbus payload. Both supporting compression.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java54
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java269
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java332
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java209
-rw-r--r--messagebus/src/tests/advancedrouting/advancedrouting.cpp12
-rw-r--r--messagebus/src/tests/bucketsequence/bucketsequence.cpp1
-rw-r--r--messagebus/src/tests/choke/choke.cpp1
-rw-r--r--messagebus/src/tests/error/error.cpp4
-rw-r--r--messagebus/src/tests/routing/routing.cpp1
-rw-r--r--messagebus/src/tests/sendadapter/sendadapter.cpp1
-rw-r--r--messagebus/src/tests/serviceaddress/serviceaddress.cpp13
-rw-r--r--messagebus/src/tests/servicepool/servicepool.cpp2
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp3
-rw-r--r--messagebus/src/tests/slobrok/slobrok.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/blob.h5
-rw-r--r--messagebus/src/vespa/messagebus/callstack.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/callstack.h2
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/network/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.h13
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp155
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h77
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h21
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp282
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h95
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h52
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.cpp406
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.h79
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp263
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h24
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcserviceaddress.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h6
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.h3
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.h4
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.cpp56
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.h9
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingtable.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.h12
-rw-r--r--messagebus/src/vespa/messagebus/sequencer.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.h3
-rw-r--r--messagebus/src/vespa/messagebus/tracenode.h2
53 files changed, 1664 insertions, 900 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index 096c0c0b485..d42e396452a 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,17 +2,28 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.component.VersionSpecification;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.Acceptor;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Spec;
+import com.yahoo.jrt.StringValue;
+import com.yahoo.jrt.Supervisor;
+import com.yahoo.jrt.Task;
+import com.yahoo.jrt.Transport;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.*;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
@@ -22,8 +33,16 @@ import com.yahoo.messagebus.routing.RoutingNode;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -44,7 +63,7 @@ public class RPCNetwork implements Network, MethodHandler {
private final Acceptor listener;
private final Mirror mirror;
private final Register register;
- private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>();
+ private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>();
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
@@ -162,9 +181,10 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter = new RPCSendV1();
- addSendAdapter(new VersionSpecification(5), adapter);
- addSendAdapter(new VersionSpecification(6), adapter);
+ RPCSendAdapter adapter1 = new RPCSendV1();
+ RPCSendAdapter adapter2 = new RPCSendV2();
+ addSendAdapter(new Version(5), adapter1);
+ addSendAdapter(new Version(6,142), adapter2);
}
@Override
@@ -234,11 +254,9 @@ public class RPCNetwork implements Network, MethodHandler {
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
- replyError(ctx, ErrorCode.NETWORK_SHUTDOWN,
- "Network layer has performed shutdown.");
+ replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -315,7 +333,7 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to register an adapter.
* @param adapter The adapter to register.
*/
- private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) {
+ private void addSendAdapter(Version version, RPCSendAdapter adapter) {
adapter.attach(this);
sendAdapters.put(version, adapter);
}
@@ -328,12 +346,8 @@ public class RPCNetwork implements Network, MethodHandler {
* @return The compatible adapter.
*/
private RPCSendAdapter getSendAdapter(Version version) {
- for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) {
- if (entry.getKey().matches(version)) {
- return entry.getValue();
- }
- }
- return null;
+ Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
+ return (lower != null) ? lower.getValue() : null;
}
/**
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
new file mode 100644
index 00000000000..d7b4887bd36
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
@@ -0,0 +1,269 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.MethodHandler;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.Routable;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceLevel;
+import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingNode;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.send1/mbus.slime".
+ *
+ * @author baldersheim
+ */
+public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+
+ private RPCNetwork net = null;
+ private String clientIdent = "client";
+ private String serverIdent = "server";
+
+ protected abstract Method buildMethod();
+ protected abstract String getReturnSpec();
+ protected abstract Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel);
+ protected abstract Reply createReply(Values ret, String serviceName, Trace trace);
+ protected abstract Params toParams(Values req);
+ protected abstract void createResponse(Values ret, Reply reply, Version version, byte [] payload);
+ @Override
+ public final void attach(RPCNetwork net) {
+ this.net = net;
+ String prefix = net.getIdentity().getServicePrefix();
+ if (prefix != null && prefix.length() > 0) {
+ clientIdent = "'" + prefix + "'";
+ serverIdent = clientIdent;
+ }
+ net.getSupervisor().addMethod(buildMethod());
+ }
+
+ @Override
+ public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
+ SendContext ctx = new SendContext(recipient, timeRemaining);
+ RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
+ Message msg = recipient.getMessage();
+ Route route = new Route(recipient.getRoute());
+ Hop hop = route.removeHop(0);
+
+ Request req = encodeRequest(version, route, address,msg, timeRemaining, payload, ctx.trace.getLevel());
+
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Sending message (version " + version + ") from " + clientIdent + " to '" +
+ address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getJRTTarget().invokeVoid(req);
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Not waiting for a reply from '" + address.getServiceName() + "'.");
+ }
+ Reply reply = new EmptyReply();
+ reply.getTrace().swap(ctx.trace);
+ net.getOwner().deliverReply(reply, recipient);
+ } else {
+ req.setContext(ctx);
+ address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
+ }
+ req.discardParameters(); // allow garbage collection of request parameters
+ }
+
+ protected final Object decode(Utf8Array protocolName, Version version, byte [] payload) {
+ Protocol protocol = net.getOwner().getProtocol(protocolName);
+ if (protocol != null) {
+ Routable routable = protocol.decode(version, payload);
+ if (routable != null) {
+ if (routable instanceof Reply) {
+ return routable;
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message.");
+ }
+ } else {
+ return new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable.");
+ }
+ } else {
+ return new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
+ }
+ }
+
+ @Override
+ public final void handleRequestDone(Request req) {
+ SendContext ctx = (SendContext)req.getContext();
+ String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ Reply reply = null;
+ Error error = null;
+ if (!req.checkReturnTypes(getReturnSpec())) {
+ // Map all known JRT errors to the appropriate message bus error.
+ reply = new EmptyReply();
+ switch (req.errorCode()) {
+ case com.yahoo.jrt.ErrorCode.TIMEOUT:
+ error = new Error(ErrorCode.TIMEOUT,
+ "A timeout occured while waiting for '" + serviceName + "' (" +
+ ctx.timeout + " seconds expired); " + req.errorMessage());
+ break;
+ case com.yahoo.jrt.ErrorCode.CONNECTION:
+ error = new Error(ErrorCode.CONNECTION_ERROR,
+ "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
+ break;
+ default:
+ error = new Error(ErrorCode.NETWORK_ERROR,
+ "A network error occured for '" + serviceName + "'; " + req.errorMessage());
+ }
+ } else {
+ reply = createReply(req.returnValues(), serviceName, ctx.trace);
+ }
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ }
+ reply.getTrace().swap(ctx.trace);
+ if (error != null) {
+ reply.addError(error);
+ }
+ net.getOwner().deliverReply(reply, ctx.recipient);
+ }
+
+ protected final class Params {
+ Version version;
+ String route;
+ String session;
+ boolean retryEnabled;
+ int retry;
+ long timeRemaining;
+ Utf8Array protocolName;
+ byte [] payload;
+ int traceLevel;
+ }
+
+ @Override
+ public final void invoke(Request request) {
+ request.detach();
+ Params p = toParams(request.parameters());
+
+ request.discardParameters(); // allow garbage collection of request parameters
+
+ // Make sure that the owner understands the protocol.
+ Protocol protocol = net.getOwner().getProtocol(p.protocolName);
+ if (protocol == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
+ return;
+ }
+ Routable routable = protocol.decode(p.version, p.payload);
+ if (routable == null) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable."));
+ return;
+ }
+ if (routable instanceof Reply) {
+ replyError(request, p.version, p.traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message."));
+ return;
+ }
+ Message msg = (Message)routable;
+ if (p.route != null && p.route.length() > 0) {
+ msg.setRoute(net.getRoute(p.route));
+ }
+ msg.setContext(new ReplyContext(request, p.version));
+ msg.pushHandler(this);
+ msg.setRetryEnabled(p.retryEnabled);
+ msg.setRetry(p.retry);
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(p.timeRemaining);
+ msg.getTrace().setLevel(p.traceLevel);
+ if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + p.session + "'.");
+ }
+ net.getOwner().deliverMessage(msg, p.session);
+ }
+
+ @Override
+ public final void handleReply(Reply reply) {
+ ReplyContext ctx = (ReplyContext)reply.getContext();
+ reply.setContext(null);
+
+ // Add trace information.
+ if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
+ }
+
+ // Encode and return the reply through the RPC request.
+ byte[] payload = new byte[0];
+ if (reply.getType() != 0) {
+ Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
+ if (protocol != null) {
+ payload = protocol.encode(ctx.version, reply);
+ }
+ if (payload == null || payload.length == 0) {
+ reply.addError(new Error(ErrorCode.ENCODE_ERROR,
+ "An error occured while encoding the reply."));
+ }
+ }
+ createResponse(ctx.request.returnValues(), reply, ctx.version, payload);
+ ctx.request.returnRequest();
+ }
+
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The JRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ private void replyError(Request request, Version version, int traceLevel, Error err) {
+ Reply reply = new EmptyReply();
+ reply.setContext(new ReplyContext(request, version));
+ reply.getTrace().setLevel(traceLevel);
+ reply.addError(err);
+ handleReply(reply);
+ }
+
+ private static class SendContext {
+
+ final RoutingNode recipient;
+ final Trace trace;
+ final double timeout;
+
+ SendContext(RoutingNode recipient, long timeRemaining) {
+ this.recipient = recipient;
+ trace = new Trace(recipient.getTrace().getLevel());
+ timeout = timeRemaining * 0.001;
+ }
+ }
+
+ private static class ReplyContext {
+
+ final Request request;
+ final Version version;
+
+ public ReplyContext(Request request, Version version) {
+ this.request = request;
+ this.version = version;
+ }
+ }
+}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index 40cb7fb9ee9..480a716e382 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -2,40 +2,41 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.jrt.*;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.DoubleValue;
+import com.yahoo.jrt.Int32Array;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int64Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
-import com.yahoo.messagebus.*;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.routing.Hop;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
-import com.yahoo.text.Utf8String;
/**
* Implements the request adapter for method "mbus.send1".
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
+public class RPCSendV1 extends RPCSend {
private final String METHOD_NAME = "mbus.send1";
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
@Override
- public void attach(RPCNetwork net) {
- this.net = net;
- String prefix = net.getIdentity().getServicePrefix();
- if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
- }
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
method.methodDesc("Send a message bus request and get a reply back.");
@@ -56,206 +57,80 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
.returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.")
.returnDesc(6, "payload", "The protocol specific reply payload.")
.returnDesc(7, "trace", "A string representation of the trace.");
- net.getSupervisor().addMethod(method);
+ return method;
}
-
@Override
- public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
- RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
- Message msg = recipient.getMessage();
- Route route = new Route(recipient.getRoute());
- Hop hop = route.removeHop(0);
-
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel) {
Request req = new Request(METHOD_NAME);
- req.parameters().add(new StringValue(version.toString()));
- req.parameters().add(new StringValue(route.toString()));
- req.parameters().add(new StringValue(address.getSessionName()));
- req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
- req.parameters().add(new Int32Value(msg.getRetry()));
- req.parameters().add(new Int64Value(timeRemaining));
- req.parameters().add(new StringValue(msg.getProtocol()));
- req.parameters().add(new DataValue(payload));
- req.parameters().add(new Int32Value(ctx.trace.getLevel()));
-
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Sending message (version " + version + ") from " + clientIdent + " to '" +
- address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getJRTTarget().invokeVoid(req);
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Not waiting for a reply from '" + address.getServiceName() + "'.");
- }
- Reply reply = new EmptyReply();
- reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
- } else {
- req.setContext(ctx);
- address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
- }
- req.discardParameters(); // allow garbage collection of request parameters
+ Values v = req.parameters();
+ v.add(new StringValue(version.toString()));
+ v.add(new StringValue(route.toString()));
+ v.add(new StringValue(address.getSessionName()));
+ v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
+ v.add(new Int32Value(msg.getRetry()));
+ v.add(new Int64Value(timeRemaining));
+ v.add(new StringValue(msg.getProtocol()));
+ v.add(new DataValue(payload));
+ v.add(new Int32Value(traceLevel));
+ return req;
}
@Override
- public void handleRequestDone(Request req) {
- SendContext ctx = (SendContext)req.getContext();
- String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ Version version = new Version(ret.get(0).asUtf8Array());
+ double retryDelay = ret.get(1).asDouble();
+ int[] errorCodes = ret.get(2).asInt32Array();
+ String[] errorMessages = ret.get(3).asStringArray();
+ String[] errorServices = ret.get(4).asStringArray();
+ Utf8Array protocolName = ret.get(5).asUtf8Array();
+ byte[] payload = ret.get(6).asData();
+ String replyTrace = ret.get(7).asString();
+
+ // Make sure that the owner understands the protocol.
Reply reply = null;
Error error = null;
- if (!req.checkReturnTypes(METHOD_RETURN)) {
- // Map all known JRT errors to the appropriate message bus error.
- reply = new EmptyReply();
- switch (req.errorCode()) {
- case com.yahoo.jrt.ErrorCode.TIMEOUT:
- error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
- ctx.timeout + " seconds expired); " + req.errorMessage());
- break;
- case com.yahoo.jrt.ErrorCode.CONNECTION:
- error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
- break;
- default:
- error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
- }
- } else {
- // Retrieve all reply components from JRT request object.
- Version version = new Version(req.returnValues().get(0).asUtf8Array());
- double retryDelay = req.returnValues().get(1).asDouble();
- int[] errorCodes = req.returnValues().get(2).asInt32Array();
- String[] errorMessages = req.returnValues().get(3).asStringArray();
- String[] errorServices = req.returnValues().get(4).asStringArray();
- Utf8Array protocolName = req.returnValues().get(5).asUtf8Array();
- byte[] payload = req.returnValues().get(6).asData();
- String replyTrace = req.returnValues().get(7).asString();
-
- // Make sure that the owner understands the protocol.
- if (payload.length > 0) {
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol != null) {
- Routable routable = protocol.decode(version, payload);
- if (routable != null) {
- if (routable instanceof Reply) {
- reply = (Reply)routable;
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable.");
- }
- } else {
- error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
- }
- }
- if (reply == null) {
- reply = new EmptyReply();
- }
- reply.setRetryDelay(retryDelay);
- for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
- reply.addError(new Error(errorCodes[i],
- errorMessages[i],
- errorServices[i].length() > 0 ? errorServices[i] : serviceName));
- }
- if (ctx.trace.getLevel() > 0) {
- ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace));
+ if (payload.length > 0) {
+ Object retval = decode(protocolName, version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
}
}
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
+ if (reply == null) {
+ reply = new EmptyReply();
}
- reply.getTrace().swap(ctx.trace);
if (error != null) {
reply.addError(error);
}
- net.getOwner().deliverReply(reply, ctx.recipient);
- }
-
- @Override
- public void invoke(Request request) {
- request.detach();
- Version version = new Version(request.parameters().get(0).asUtf8Array());
- String route = request.parameters().get(1).asString();
- String session = request.parameters().get(2).asString();
- boolean retryEnabled = (request.parameters().get(3).asInt8() != 0);
- int retry = request.parameters().get(4).asInt32();
- long timeRemaining = request.parameters().get(5).asInt64();
- Utf8Array protocolName = request.parameters().get(6).asUtf8Array();
- byte[] payload = request.parameters().get(7).asData();
- int traceLevel = request.parameters().get(8).asInt32();
-
- request.discardParameters(); // allow garbage collection of request parameters
-
- // Make sure that the owner understands the protocol.
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol == null) {
- replyError(request, version, traceLevel,
- new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + "."));
- return;
- }
- Routable routable = protocol.decode(version, payload);
- if (routable == null) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable."));
- return;
- }
- if (routable instanceof Reply) {
- replyError(request, version, traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message."));
- return;
+ reply.setRetryDelay(retryDelay);
+ for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
+ reply.addError(new Error(errorCodes[i], errorMessages[i],
+ errorServices[i].length() > 0 ? errorServices[i] : serviceName));
}
- Message msg = (Message)routable;
- if (route != null && route.length() > 0) {
- msg.setRoute(net.getRoute(route));
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(replyTrace));
}
- msg.setContext(new ReplyContext(request, version));
- msg.pushHandler(this);
- msg.setRetryEnabled(retryEnabled);
- msg.setRetry(retry);
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(timeRemaining);
- msg.getTrace().setLevel(traceLevel);
- if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'.");
- }
- net.getOwner().deliverMessage(msg, session);
+ return reply;
}
- @Override
- public void handleReply(Reply reply) {
- ReplyContext ctx = (ReplyContext)reply.getContext();
- reply.setContext(null);
-
- // Add trace information.
- if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
- }
+ protected Params toParams(Values args) {
+ Params p = new Params();
+ p.version = new Version(args.get(0).asUtf8Array());
+ p.route = args.get(1).asString();
+ p.session = args.get(2).asString();
+ p.retryEnabled = (args.get(3).asInt8() != 0);
+ p.retry = args.get(4).asInt32();
+ p.timeRemaining = args.get(5).asInt64();
+ p.protocolName = args.get(6).asUtf8Array();
+ p.payload = args.get(7).asData();
+ p.traceLevel = args.get(8).asInt32();
+ return p;
+ }
- // Encode and return the reply through the RPC request.
- byte[] payload = new byte[0];
- if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
- }
- if (payload == null || payload.length == 0) {
- reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
- }
- }
+ @Override
+ protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
int[] eCodes = new int[reply.getNumErrors()];
String[] eMessages = new String[reply.getNumErrors()];
String[] eServices = new String[reply.getNumErrors()];
@@ -265,57 +140,14 @@ public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RP
eMessages[i] = error.getMessage();
eServices[i] = error.getService() != null ? error.getService() : "";
}
- ctx.request.returnValues().add(new StringValue(ctx.version.toString()));
- ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay()));
- ctx.request.returnValues().add(new Int32Array(eCodes));
- ctx.request.returnValues().add(new StringArray(eMessages));
- ctx.request.returnValues().add(new StringArray(eServices));
- ctx.request.returnValues().add(new StringValue(reply.getProtocol()));
- ctx.request.returnValues().add(new DataValue(payload));
- ctx.request.returnValues().add(new StringValue(
- reply.getTrace().getRoot() != null ?
- reply.getTrace().getRoot().encode() :
- ""));
- ctx.request.returnRequest();
+ ret.add(new StringValue(version.toString()));
+ ret.add(new DoubleValue(reply.getRetryDelay()));
+ ret.add(new Int32Array(eCodes));
+ ret.add(new StringArray(eMessages));
+ ret.add(new StringArray(eServices));
+ ret.add(new StringValue(reply.getProtocol()));
+ ret.add(new DataValue(payload));
+ ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : ""));
}
- /**
- * Send an error reply for a given request.
- *
- * @param request The JRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- private void replyError(Request request, Version version, int traceLevel, Error err) {
- Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
- reply.getTrace().setLevel(traceLevel);
- reply.addError(err);
- handleReply(reply);
- }
-
- private static class SendContext {
-
- final RoutingNode recipient;
- final Trace trace;
- final double timeout;
-
- SendContext(RoutingNode recipient, long timeRemaining) {
- this.recipient = recipient;
- trace = new Trace(recipient.getTrace().getLevel());
- timeout = timeRemaining * 0.001;
- }
- }
-
- private static class ReplyContext {
-
- final Request request;
- final Version version;
-
- public ReplyContext(Request request, Version version) {
- this.request = request;
- this.version = version;
- }
- }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
new file mode 100644
index 00000000000..8cc0b73ae30
--- /dev/null
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
@@ -0,0 +1,209 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.messagebus.network.rpc;
+
+import com.yahoo.component.Version;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.DataValue;
+import com.yahoo.jrt.Int32Value;
+import com.yahoo.jrt.Int8Value;
+import com.yahoo.jrt.Method;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.Values;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
+import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.text.Utf8;
+import com.yahoo.text.Utf8Array;
+
+/**
+ * Implements the request adapter for method "mbus.slime".
+ *
+ * @author baldersheim
+ */
+public class RPCSendV2 extends RPCSend {
+
+ private final static String METHOD_NAME = "mbus.slime";
+ private final static String METHOD_PARAMS = "bixbix";
+ private final static String METHOD_RETURN = "bixbix";
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024);
+
+ @Override
+ protected String getReturnSpec() { return METHOD_RETURN; }
+ @Override
+ protected Method buildMethod() {
+
+ Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
+ method.methodDesc("Send a message bus request and get a reply back.");
+ method.paramDesc(0, "header_encoding", "Encoding type of header.")
+ .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.")
+ .paramDesc(2, "header_payload", "Slime encoded header payload.")
+ .paramDesc(3, "body_encoding", "Encoding type of body.")
+ .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.")
+ .paramDesc(5, "body_payload", "Slime encoded body payload.");
+ method.returnDesc(0, "header_encoding", "Encoding type of header.")
+ .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.")
+ .returnDesc(2, "header_payload", "Slime encoded header payload.")
+ .returnDesc(3, "body_encoding", "Encoding type of body.")
+ .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.")
+ .returnDesc(5, "body_payload", "Slime encoded body payload.");
+ return method;
+ }
+ private static final String VERSION_F = new String("version");
+ private static final String ROUTE_F = new String("route");
+ private static final String SESSION_F = new String("session");
+ private static final String PROTOCOL_F = new String("prot");
+ private static final String TRACELEVEL_F = new String("tracelevel");
+ private static final String TRACE_F = new String("trace");
+ private static final String USERETRY_F = new String("useretry");
+ private static final String RETRY_F = new String("retry");
+ private static final String RETRYDELAY_F = new String("retrydelay");
+ private static final String TIMEREMAINING_F = new String("timeleft");
+ private static final String ERRORS_F = new String("errors");
+ private static final String SERVICE_F = new String("service");
+ private static final String CODE_F = new String("code");
+ private static final String BLOB_F = new String("msg");
+ private static final String MSG_F = new String("msg");
+
+ @Override
+ protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
+ long timeRemaining, byte[] payload, int traceLevel)
+ {
+
+ Request req = new Request(METHOD_NAME);
+ Values v = req.parameters();
+
+ v.add(new Int8Value(CompressionType.NONE.getCode()));
+ v.add(new Int32Value(0));
+ v.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setString(PROTOCOL_F, msg.getProtocol().toString());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMEREMAINING_F, msg.getTimeRemaining());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ root.setData(BLOB_F, payload);
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ v.add(new Int8Value(compressionResult.type().getCode()));
+ v.add(new Int32Value(compressionResult.uncompressedSize()));
+ v.add(new DataValue(compressionResult.data()));
+
+ return req;
+ }
+
+ @Override
+ protected Reply createReply(Values ret, String serviceName, Trace trace) {
+ CompressionType compression = CompressionType.valueOf(ret.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(ret.get(5).asData(), compression, ret.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+
+ Version version = new Version(root.field(VERSION_F).asString());
+ byte[] payload = root.field(BLOB_F).asData();
+
+ // Make sure that the owner understands the protocol.
+ Reply reply = null;
+ Error error = null;
+ if (payload.length > 0) {
+ Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload);
+ if (retval instanceof Reply) {
+ reply = (Reply) retval;
+ } else {
+ error = (Error) retval;
+ }
+ }
+ if (reply == null) {
+ reply = new EmptyReply();
+ }
+ if (error != null) {
+ reply.addError(error);
+ }
+ reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble());
+
+ Inspector errors = root.field(ERRORS_F);
+ for (int i = 0; i < errors.entries(); i++) {
+ Inspector e = errors.entry(i);
+ String service = e.field(SERVICE_F).asString();
+ reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(),
+ (service != null && service.length() > 0) ? service : serviceName));
+ }
+ if (trace.getLevel() > 0) {
+ trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString()));
+ }
+ return reply;
+ }
+
+ protected Params toParams(Values args) {
+ CompressionType compression = CompressionType.valueOf(args.get(3).asInt8());
+ byte[] slimeBytes = compressor.decompress(args.get(5).asData(), compression, args.get(4).asInt32());
+ Slime slime = BinaryFormat.decode(slimeBytes);
+ Inspector root = slime.get();
+ Params p = new Params();
+ p.version = new Version(root.field(VERSION_F).asString());
+ p.route = root.field(ROUTE_F).asString();
+ p.session = root.field(SESSION_F).asString();
+ p.retryEnabled = root.field(USERETRY_F).asBool();
+ p.retry = (int)root.field(RETRY_F).asLong();
+ p.timeRemaining = root.field(TIMEREMAINING_F).asLong();
+ p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString()));
+ p.payload = root.field(BLOB_F).asData();
+ p.traceLevel = (int)root.field(TRACELEVEL_F).asLong();
+ return p;
+ }
+
+ @Override
+ protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
+ ret.add(new Int8Value(CompressionType.NONE.getCode()));
+ ret.add(new Int32Value(0));
+ ret.add(new DataValue(new byte[0]));
+
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol().toString());
+ root.setData(BLOB_F, payload);
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor array = root.setArray(ERRORS_F);
+ for (int i = 0; i < reply.getNumErrors(); i++) {
+ Cursor e = array.addObject();
+ Error mbusE = reply.getError(i);
+ e.setLong(CODE_F, mbusE.getCode());
+ e.setString(MSG_F, mbusE.getMessage());
+ if (mbusE.getService() != null) {
+ e.setString(SERVICE_F, mbusE.getService());
+ }
+ }
+ }
+
+ byte[] serializedSlime = BinaryFormat.encode(slime);
+ Compressor.Compression compressionResult = compressor.compress(serializedSlime);
+
+ ret.add(new Int8Value(compressionResult.type().getCode()));
+ ret.add(new Int32Value(compressionResult.uncompressedSize()));
+ ret.add(new DataValue(compressionResult.data()));
+ }
+
+}
diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
index e847fdf3222..8d6b8cd3875 100644
--- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp
+++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
@@ -1,17 +1,15 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routing/errordirective.h>
-#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
+
#include <vespa/messagebus/testlib/custompolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
using namespace mbus;
@@ -117,7 +115,7 @@ Test::Main()
void
Test::testAdvanced(TestData &data)
{
- const double TIMEOUT=60;
+ const double TIMEOUT = 60;
IProtocol::SP protocol(new SimpleProtocol());
SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol);
simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE)));
diff --git a/messagebus/src/tests/bucketsequence/bucketsequence.cpp b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
index 4073469c253..d98acc4f191 100644
--- a/messagebus/src/tests/bucketsequence/bucketsequence.cpp
+++ b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
@@ -5,7 +5,6 @@
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp
index b8e20eb9074..91f91c84f45 100644
--- a/messagebus/src/tests/choke/choke.cpp
+++ b/messagebus/src/tests/choke/choke.cpp
@@ -2,7 +2,6 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/reply.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp
index dbaa869507b..1d8a489a5ed 100644
--- a/messagebus/src/tests/error/error.cpp
+++ b/messagebus/src/tests/error/error.cpp
@@ -65,14 +65,14 @@ Test::Main()
reply = pxy.getReply();
ASSERT_TRUE(reply.get() != 0);
- EXPECT_EQUAL(reply->getNumErrors(), 1u);
+ ASSERT_EQUAL(reply->getNumErrors(), 1u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality"));
is->forward(std::move(reply));
reply = src.getReply();
ASSERT_TRUE(reply.get() != 0);
- EXPECT_EQUAL(reply->getNumErrors(), 2u);
+ ASSERT_EQUAL(reply->getNumErrors(), 2u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session");
}
diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp
index e907e0d1163..515cbd99fde 100644
--- a/messagebus/src/tests/routing/routing.cpp
+++ b/messagebus/src/tests/routing/routing.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/routing/routingcontext.h>
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp
index 8ae1d0993cb..e3fa3278300 100644
--- a/messagebus/src/tests/sendadapter/sendadapter.cpp
+++ b/messagebus/src/tests/sendadapter/sendadapter.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
index 15a2e9fdac8..89edb981716 100644
--- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp
+++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
@@ -3,20 +3,7 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/messagebus/testlib/simplemessage.h>
-#include <vespa/messagebus/testlib/simplereply.h>
-#include <vespa/messagebus/testlib/simpleprotocol.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/sourcesession.h>
-#include <vespa/messagebus/intermediatesession.h>
-#include <vespa/messagebus/destinationsession.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/error.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/network/rpcservice.h>
-#include <vespa/messagebus/sourcesessionparams.h>
using namespace mbus;
diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp
index c98b342b2f3..86a800b600a 100644
--- a/messagebus/src/tests/servicepool/servicepool.cpp
+++ b/messagebus/src/tests/servicepool/servicepool.cpp
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/messagebus/network/rpcservicepool.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index b5e6cac970e..070b51bbbc2 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -1,9 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/messagebus.h>
-#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
diff --git a/messagebus/src/tests/slobrok/slobrok.cpp b/messagebus/src/tests/slobrok/slobrok.cpp
index 360705e7eae..6c389c25e70 100644
--- a/messagebus/src/tests/slobrok/slobrok.cpp
+++ b/messagebus/src/tests/slobrok/slobrok.cpp
@@ -2,10 +2,10 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
-#include <string>
-#include <sstream>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+
#include <vespa/vespalib/util/host_name.h>
using slobrok::api::IMirrorAPI;
diff --git a/messagebus/src/vespa/messagebus/blob.h b/messagebus/src/vespa/messagebus/blob.h
index 1e84222064a..0509d71d77c 100644
--- a/messagebus/src/vespa/messagebus/blob.h
+++ b/messagebus/src/vespa/messagebus/blob.h
@@ -26,13 +26,13 @@ public:
_payload(Alloc::alloc(s)),
_sz(s)
{ }
- Blob(Blob && rhs) :
+ Blob(Blob && rhs) noexcept :
_payload(std::move(rhs._payload)),
_sz(rhs._sz)
{
rhs._sz = 0;
}
- Blob & operator = (Blob && rhs) {
+ Blob & operator = (Blob && rhs) noexcept {
swap(rhs);
return *this;
}
@@ -65,4 +65,3 @@ private:
};
} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp
index 0ab8658d53f..b7179e14cad 100644
--- a/messagebus/src/vespa/messagebus/callstack.cpp
+++ b/messagebus/src/vespa/messagebus/callstack.cpp
@@ -13,7 +13,7 @@ CallStack::discard()
{
while (!_stack.empty()) {
const Frame &frame = _stack.back();
- if (frame.discardHandler != NULL) {
+ if (frame.discardHandler != nullptr) {
frame.discardHandler->handleDiscard(frame.ctx);
}
_stack.pop_back();
diff --git a/messagebus/src/vespa/messagebus/callstack.h b/messagebus/src/vespa/messagebus/callstack.h
index 68da598e796..0f9d8c93b29 100644
--- a/messagebus/src/vespa/messagebus/callstack.h
+++ b/messagebus/src/vespa/messagebus/callstack.h
@@ -72,7 +72,7 @@ public:
* @param ctx The context to store.
* @param discardHandler The handler for discarded messages.
**/
- void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = NULL) {
+ void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = nullptr) {
_stack.emplace_back(&replyHandler, discardHandler, ctx);
}
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
index 3959666e718..ecbc036ffed 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
@@ -6,7 +6,7 @@ namespace mbus {
DestinationSessionParams::DestinationSessionParams() :
_name("destination"),
_broadcastName(true),
- _handler(NULL)
+ _handler(nullptr)
{ }
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index 4cd19b62419..5a8f510ddcf 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -97,7 +97,7 @@ MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) :
MessageBusParams params;
while (!protocols.empty()) {
IProtocol::SP protocol = protocols.extract();
- if (protocol.get() != NULL) {
+ if (protocol.get() != nullptr) {
params.addProtocol(protocol);
}
}
@@ -155,7 +155,7 @@ MessageBus::setup(const MessageBusParams &params)
// Start messenger.
IRetryPolicy::SP retryPolicy = params.getRetryPolicy();
- if (retryPolicy.get() != NULL) {
+ if (retryPolicy.get() != nullptr) {
_resender.reset(new Resender(retryPolicy));
Messenger::ITask::UP task(new ResenderTask(*_resender));
@@ -271,7 +271,7 @@ MessageBus::sync()
void
MessageBus::handleMessage(Message::UP msg)
{
- if (_resender.get() != NULL && msg->hasBucketSequence()) {
+ if (_resender.get() != nullptr && msg->hasBucketSequence()) {
deliverError(std::move(msg), ErrorCode::SEQUENCE_ERROR,
"Bucket sequences not supported when resender is enabled.");
return;
@@ -359,7 +359,7 @@ MessageBus::handleDiscard(Context ctx)
void
MessageBus::deliverMessage(Message::UP msg, const string &session)
{
- IMessageHandler *msgHandler = NULL;
+ IMessageHandler *msgHandler = nullptr;
{
LockGuard guard(_lock);
std::map<string, IMessageHandler*>::iterator it = _sessions.find(session);
@@ -367,7 +367,7 @@ MessageBus::deliverMessage(Message::UP msg, const string &session)
msgHandler = it->second;
}
}
- if (msgHandler == NULL) {
+ if (msgHandler == nullptr) {
deliverError(std::move(msg), ErrorCode::UNKNOWN_SESSION,
make_string("Session '%s' does not exist.", session.c_str()));
} else if (!checkPending(*msg)) {
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 2d1204ee7b6..4b612b66c31 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -32,7 +32,7 @@ public:
}
~MessageTask() {
- if (_msg.get() != NULL) {
+ if (_msg.get() != nullptr) {
_msg->discard();
}
}
@@ -42,7 +42,7 @@ public:
}
uint8_t priority() const override {
- if (_msg.get() != NULL) {
+ if (_msg.get() != nullptr) {
return _msg->priority();
}
@@ -64,7 +64,7 @@ public:
}
~ReplyTask() {
- if (_reply.get() != NULL) {
+ if (_reply.get() != nullptr) {
_reply->discard();
}
}
@@ -74,7 +74,7 @@ public:
}
uint8_t priority() const override {
- if (_reply.get() != NULL) {
+ if (_reply.get() != nullptr) {
return _reply->priority();
}
@@ -205,7 +205,7 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg)
_queue.pop();
}
}
- if (task.get() != NULL) {
+ if (task.get() != nullptr) {
try {
task->run();
} catch (const std::exception &e) {
diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
index e8992034622..750ff20240f 100644
--- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
@@ -6,7 +6,9 @@ vespa_add_library(messagebus_network OBJECT
oosmanager.cpp
rpcnetwork.cpp
rpcnetworkparams.cpp
+ rpcsend.cpp
rpcsendv1.cpp
+ rpcsendv2.cpp
rpcservice.cpp
rpcserviceaddress.cpp
rpcservicepool.cpp
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.cpp b/messagebus/src/vespa/messagebus/network/oosmanager.cpp
index eecfe1da447..250df147675 100644
--- a/messagebus/src/vespa/messagebus/network/oosmanager.cpp
+++ b/messagebus/src/vespa/messagebus/network/oosmanager.cpp
@@ -89,7 +89,7 @@ OOSManager::isOOS(const string &service)
return false;
}
vespalib::LockGuard guard(_lock);
- if (_oosSet.get() == NULL) {
+ if (_oosSet.get() == nullptr) {
return false;
}
if (_oosSet->find(service) == _oosSet->end()) {
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h
index b521520d2b3..eac00b93896 100644
--- a/messagebus/src/vespa/messagebus/network/oosmanager.h
+++ b/messagebus/src/vespa/messagebus/network/oosmanager.h
@@ -20,14 +20,11 @@ class RPCNetwork;
*/
class OOSManager : public FNET_Task {
public:
- /**
- * Convenience typedefs.
- */
- typedef slobrok::api::IMirrorAPI IMirrorAPI;
- typedef IMirrorAPI::SpecList SpecList;
- typedef std::vector<OOSClient::SP> ClientList;
- typedef std::set<string> StringSet;
- typedef std::shared_ptr<StringSet> OOSSet;
+ using IMirrorAPI = slobrok::api::IMirrorAPI;
+ using SpecList = IMirrorAPI::SpecList;
+ using ClientList = std::vector<OOSClient::SP>;
+ using StringSet = std::set<string>;
+ using OOSSet = std::shared_ptr<StringSet>;
private:
FRT_Supervisor &_orb;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 75c5fc3f6c5..4b4b23a75db 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -1,6 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "inetworkowner.h"
#include "rpcnetwork.h"
+#include "rpcservicepool.h"
+#include "oosmanager.h"
+#include "rpcsendv1.h"
+#include "rpcsendv2.h"
+#include "rpctargetpool.h"
+#include "rpcnetworkparams.h"
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/iprotocol.h>
#include <vespa/messagebus/tracelevel.h>
@@ -11,6 +16,8 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/scheduler.h>
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/frt/supervisor.h>
#include <vespa/log/log.h>
LOG_SETUP(".rpcnetwork");
@@ -52,17 +59,15 @@ public:
namespace mbus {
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
- const std::vector<RoutingNode*> &recipients) :
- _net(net),
- _msg(msg),
- _traceLevel(msg.getTrace().getLevel()),
- _recipients(recipients),
- _hasError(false),
- _pending(_recipients.size()),
- _version(_net.getVersion())
-{
- // empty
-}
+ const std::vector<RoutingNode*> &recipients)
+ : _net(net),
+ _msg(msg),
+ _traceLevel(msg.getTrace().getLevel()),
+ _recipients(recipients),
+ _hasError(false),
+ _pending(_recipients.size()),
+ _version(_net.getVersion())
+{ }
void
RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
@@ -70,7 +75,7 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
bool shouldSend = false;
{
vespalib::LockGuard guard(_lock);
- if (version == NULL) {
+ if (version == nullptr) {
_hasError = true;
} else if (*version < _version) {
_version = *version;
@@ -85,11 +90,9 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
-RPCNetwork::TargetPoolTask::TargetPoolTask(
- FNET_Scheduler &scheduler,
- RPCTargetPool &pool) :
- FNET_Task(&scheduler),
- _pool(pool)
+RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
+ : FNET_Task(&scheduler),
+ _pool(pool)
{
ScheduleNow();
}
@@ -104,24 +107,26 @@ RPCNetwork::TargetPoolTask::PerformTask()
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(0),
_ident(params.getIdentity()),
- _threadPool(128000, 0),
- _transport(),
- _orb(&_transport, NULL),
- _scheduler(*_transport.GetScheduler()),
- _targetPool(params.getConnectionExpireSecs()),
- _targetPoolTask(_scheduler, _targetPool),
- _servicePool(*this, 4096),
- _slobrokCfgFactory(params.getSlobrokConfig()),
- _mirror(std::make_unique<slobrok::api::MirrorAPI>(_orb, _slobrokCfgFactory)),
- _regAPI(std::make_unique<slobrok::api::RegisterAPI>(_orb, _slobrokCfgFactory)),
- _oosManager(_orb, *_mirror, params.getOOSServerPattern()),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
+ _transport(std::make_unique<FNET_Transport>()),
+ _orb(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)),
+ _scheduler(*_transport->GetScheduler()),
+ _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
+ _targetPoolTask(_scheduler, *_targetPool),
+ _servicePool(std::make_unique<RPCServicePool>(*this, 4096)),
+ _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
+ _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
+ _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
+ _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())),
_requestedPort(params.getListenPort()),
- _sendV1(),
- _sendAdapters()
+ _sendV1(std::make_unique<RPCSendV1>()),
+ _sendV2(std::make_unique<RPCSendV2>()),
+ _sendAdapters(),
+ _compressionConfig(params.getCompressionConfig())
{
- _transport.SetDirectWrite(false);
- _transport.SetMaxInputBufferSize(params.getMaxInputBufferSize());
- _transport.SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
+ _transport->SetDirectWrite(false);
+ _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
+ _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
}
RPCNetwork::~RPCNetwork()
@@ -132,33 +137,33 @@ RPCNetwork::~RPCNetwork()
FRT_RPCRequest *
RPCNetwork::allocRequest()
{
- return _orb.AllocRPCRequest();
+ return _orb->AllocRPCRequest();
}
RPCTarget::SP
RPCNetwork::getTarget(const RPCServiceAddress &address)
{
- return _targetPool.getTarget(_orb, address);
+ return _targetPool->getTarget(*_orb, address);
}
void
-RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode,
- const string &errMsg)
+RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg)
{
- for (std::vector<RoutingNode*>::const_iterator it = ctx._recipients.begin();
- it != ctx._recipients.end(); ++it)
- {
+ for (RoutingNode * rnode : ctx._recipients) {
Reply::UP reply(new EmptyReply());
reply->setTrace(Trace(ctx._traceLevel));
reply->addError(Error(errCode, errMsg));
- _owner->deliverReply(std::move(reply), **it);
+ _owner->deliverReply(std::move(reply), *rnode);
}
}
+int RPCNetwork::getPort() const { return _orb->GetListenPort(); }
+
+
void
RPCNetwork::flushTargetPool()
{
- _targetPool.flushTargets(true);
+ _targetPool->flushTargets(true);
}
const vespalib::Version &
@@ -173,13 +178,13 @@ RPCNetwork::attach(INetworkOwner &owner)
LOG_ASSERT(_owner == 0);
_owner = &owner;
- _sendV1.attach(*this);
- _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(5), &_sendV1));
- _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(6), &_sendV1));
+ _sendV1->attach(*this);
+ _sendV2->attach(*this);
+ _sendAdapters[vespalib::Version(5)] = _sendV1.get();
+ _sendAdapters[vespalib::Version(6, 142)] = _sendV2.get();
- FRT_ReflectionBuilder builder(&_orb);
- builder.DefineMethod("mbus.getVersion", "", "s", true,
- FRT_METHOD(RPCNetwork::invoke), this);
+ FRT_ReflectionBuilder builder(_orb.get());
+ builder.DefineMethod("mbus.getVersion", "", "s", true, FRT_METHOD(RPCNetwork::invoke), this);
builder.MethodDesc("Retrieves the message bus version.");
builder.ReturnDesc("version", "The message bus version.");
}
@@ -193,29 +198,23 @@ RPCNetwork::invoke(FRT_RPCRequest *req)
const string
RPCNetwork::getConnectionSpec() const
{
- return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb.GetListenPort());
+ return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb->GetListenPort());
}
RPCSendAdapter *
RPCNetwork::getSendAdapter(const vespalib::Version &version)
{
- for (SendAdapterMap::iterator it = _sendAdapters.begin();
- it != _sendAdapters.end(); ++it)
- {
- if (it->first.matches(version)) {
- return it->second;
- }
- }
- return NULL;
+ auto lower = _sendAdapters.lower_bound(version);
+ return (lower != _sendAdapters.end()) ? lower->second : nullptr;
}
bool
RPCNetwork::start()
{
- if (!_orb.Listen(_requestedPort)) {
+ if (!_orb->Listen(_requestedPort)) {
return false;
}
- if (!_transport.Start(&_threadPool)) {
+ if (!_transport->Start(_threadPool.get())) {
return false;
}
return true;
@@ -227,13 +226,13 @@ bool
RPCNetwork::waitUntilReady(double seconds) const
{
slobrok::api::SlobrokList brokerList;
- slobrok::Configurator::UP configurator = _slobrokCfgFactory.create(brokerList);
+ slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList);
bool hasConfig = false;
for (uint32_t i = 0; i < seconds * 100; ++i) {
if (configurator->poll()) {
hasConfig = true;
}
- if (_mirror->ready() && _oosManager.isReady()) {
+ if (_mirror->ready() && _oosManager->isReady()) {
return true;
}
FastOS_Thread::Sleep(10);
@@ -244,7 +243,7 @@ RPCNetwork::waitUntilReady(double seconds) const
std::string brokers = brokerList.logString();
LOG(error, "mirror (of %s) failed to become ready in %d seconds",
brokers.c_str(), (int)seconds);
- } else if (! _oosManager.isReady()) {
+ } else if (! _oosManager->isReady()) {
LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds);
}
return false;
@@ -293,24 +292,23 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient)
Error
RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName)
{
- if (_oosManager.isOOS(serviceName)) {
+ if (_oosManager->isOOS(serviceName)) {
return Error(ErrorCode::SERVICE_OOS,
make_string("The service '%s' has been marked as out of service.",
serviceName.c_str()));
}
- RPCServiceAddress::UP ret = _servicePool.resolve(serviceName);
- if (ret.get() == NULL) {
+ RPCServiceAddress::UP ret = _servicePool->resolve(serviceName);
+ if (ret.get() == nullptr) {
return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE,
make_string("The address of service '%s' could not be resolved. It is not currently "
- "registered with the Vespa name server. "
- "The service must be having problems, or the routing configuration is wrong.",
- serviceName.c_str()));
+ "registered with the Vespa name server. "
+ "The service must be having problems, or the routing configuration is wrong.",
+ serviceName.c_str()));
}
- RPCTarget::SP target = _targetPool.getTarget(_orb, *ret);
- if (target.get() == NULL) {
+ RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret);
+ if (target.get() == nullptr) {
return Error(ErrorCode::CONNECTION_ERROR,
- make_string("Failed to connect to service '%s'.",
- serviceName.c_str()));
+ make_string("Failed to connect to service '%s'.", serviceName.c_str()));
}
ret->setTarget(target); // free by freeServiceAddress()
recipient.setServiceAddress(IServiceAddress::UP(ret.release()));
@@ -330,7 +328,7 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients
double timeout = ctx._msg.getTimeRemainingNow() / 1000.0;
for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) {
RoutingNode *&recipient = ctx._recipients[i];
- LOG_ASSERT(recipient != NULL);
+ LOG_ASSERT(recipient != nullptr);
RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient->getServiceAddress());
LOG_ASSERT(address.hasTarget());
@@ -343,13 +341,12 @@ void
RPCNetwork::send(RPCNetwork::SendContext &ctx)
{
if (ctx._hasError) {
- replyError(ctx, ErrorCode::HANDSHAKE_FAILED,
- "An error occured while resolving version.");
+ replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version.");
} else {
uint64_t timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
RPCSendAdapter *adapter = getSendAdapter(ctx._version);
- if (adapter == NULL) {
+ if (adapter == nullptr) {
replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION,
make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str()));
} else if (timeRemaining == 0) {
@@ -378,8 +375,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
- _transport.ShutDown(false);
- _threadPool.Close();
+ _transport->ShutDown(false);
+ _threadPool->Close();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 856f9d0ef64..0cda6ffecb7 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -2,27 +2,33 @@
#pragma once
#include "inetwork.h"
-#include "oosmanager.h"
-#include "rpcnetworkparams.h"
-#include "rpcsendv1.h"
-#include "rpcservicepool.h"
-#include "rpctargetpool.h"
+#include "rpcsendadapter.h"
+#include "rpctarget.h"
+#include "identity.h"
#include <vespa/messagebus/blob.h>
#include <vespa/messagebus/blobref.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/reply.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/vespalib/component/versionspecification.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/compressionconfig.h>
+#include <vespa/fnet/frt/invokable.h>
+
+class FNET_Transport;
namespace slobrok {
- namespace api {
- class RegisterAPI;
- }
+ namespace api { class RegisterAPI; }
+ class ConfiguratorFactory;
}
namespace mbus {
+
+class OOSManager;
+class RPCServicePool;
+class RPCTargetPool;
+class RPCNetworkParams;
+class RPCServiceAddress;
+
/**
* Network implementation based on RPC. This class is responsible for
* keeping track of services and for sending messages to services.
@@ -30,6 +36,7 @@ namespace mbus {
class RPCNetwork : public INetwork,
public FRT_Invokable {
private:
+ using CompressionConfig = vespalib::compression::CompressionConfig;
struct SendContext : public RPCTarget::IVersionHandler {
vespalib::Lock _lock;
RPCNetwork &_net;
@@ -51,24 +58,26 @@ private:
void PerformTask() override;
};
- typedef std::map<vespalib::VersionSpecification, RPCSendAdapter*> SendAdapterMap;
-
- INetworkOwner *_owner;
- Identity _ident;
- FastOS_ThreadPool _threadPool;
- FNET_Transport _transport;
- FRT_Supervisor _orb;
- FNET_Scheduler &_scheduler;
- RPCTargetPool _targetPool;
- TargetPoolTask _targetPoolTask;
- RPCServicePool _servicePool;
- slobrok::ConfiguratorFactory _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- OOSManager _oosManager;
- int _requestedPort;
- RPCSendV1 _sendV1;
- SendAdapterMap _sendAdapters;
+ using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
+
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ TargetPoolTask _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ std::unique_ptr<OOSManager> _oosManager;
+ int _requestedPort;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -159,7 +168,7 @@ public:
*
* @return port number
**/
- int getPort() const { return _orb.GetListenPort(); }
+ int getPort() const;
/**
* Allocate a new rpc request object. The caller of this method gets the
@@ -191,7 +200,7 @@ public:
*
* @return internal OOS manager
**/
- OOSManager &getOOSManager() { return _oosManager; }
+ OOSManager &getOOSManager() { return *_oosManager; }
/**
* Obtain a reference to the internal supervisor. This is used by
@@ -199,7 +208,7 @@ public:
*
* @return The supervisor.
*/
- FRT_Supervisor &getSupervisor() { return _orb; }
+ FRT_Supervisor &getSupervisor() { return *_orb; }
/**
* Deliver an error reply to the recipients of a {@link SendContext} in a
@@ -209,8 +218,7 @@ public:
* @param errCode The error code to return.
* @param errMsg The error string to return.
*/
- void replyError(const SendContext &ctx, uint32_t errCode,
- const string &errMsg);
+ void replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg);
void attach(INetworkOwner &owner) override;
const string getConnectionSpec() const override;
@@ -225,9 +233,8 @@ public:
void shutdown() override;
void postShutdownHook() override;
const slobrok::api::IMirrorAPI &getMirror() const override;
-
+ CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
};
} // namespace mbus
-
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index e025db0e350..df35d51cb54 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -11,7 +11,8 @@ RPCNetworkParams::RPCNetworkParams() :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _connectionExpireSecs(30)
+ _connectionExpireSecs(30),
+ _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
RPCNetworkParams::~RPCNetworkParams() {}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index a65248f7299..bfc624a6523 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -3,6 +3,7 @@
#include "identity.h"
#include <vespa/slobrok/cfg.h>
+#include <vespa/vespalib/util/compressionconfig.h>
namespace mbus {
@@ -12,13 +13,15 @@ namespace mbus {
*/
class RPCNetworkParams {
private:
- Identity _identity;
+ using CompressionConfig = vespalib::compression::CompressionConfig;
+ Identity _identity;
config::ConfigUri _slobrokConfig;
- string _oosServerPattern;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- double _connectionExpireSecs;
+ string _oosServerPattern;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
public:
RPCNetworkParams();
@@ -177,6 +180,12 @@ public:
_maxOutputBufferSize = maxOutputBufferSize;
return *this;
}
+
+ RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) {
+ _compressionConfig = compressionConfig;
+ return *this;
+ }
+ CompressionConfig getCompressionConfig() const { return _compressionConfig; }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
new file mode 100644
index 00000000000..705b8648442
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -0,0 +1,282 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpcsend.h"
+#include "rpcsend_private.h"
+#include "rpcserviceaddress.h"
+#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/tracelevel.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/fnet/channel.h>
+#include <vespa/fnet/frt/reflection.h>
+
+#include <vespa/vespalib/data/slime/cursor.h>
+
+using vespalib::make_string;
+
+namespace mbus {
+
+using network::internal::ReplyContext;
+using network::internal::SendContext;
+
+namespace {
+
+class FillByCopy final : public PayLoadFiller
+{
+public:
+ FillByCopy(BlobRef payload) : _payload(payload) { }
+ void fill(FRT_Values & v) const override {
+ v.AddData(_payload.data(), _payload.size());
+ }
+ void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
+ v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
+ }
+private:
+ BlobRef _payload;
+};
+
+class FillByHandover final : public PayLoadFiller
+{
+public:
+ FillByHandover(Blob payload) : _payload(std::move(payload)) { }
+ void fill(FRT_Values & v) const override {
+ size_t sz = _payload.size();
+ v.AddData(std::move(_payload.payload()), sz);
+ }
+ void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
+ v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
+ }
+private:
+ mutable Blob _payload;
+};
+
+}
+
+RPCSend::RPCSend() :
+ _net(nullptr),
+ _clientIdent("client"),
+ _serverIdent("server")
+{ }
+
+RPCSend::~RPCSend() {}
+
+void
+RPCSend::attach(RPCNetwork &net)
+{
+ _net = &net;
+ const string &prefix = _net->getIdentity().getServicePrefix();
+ if (!prefix.empty()) {
+ _clientIdent = make_string("'%s'", prefix.c_str());
+ _serverIdent = _clientIdent;
+ }
+
+ FRT_ReflectionBuilder builder(&_net->getSupervisor());
+ build(builder);
+}
+
+void
+RPCSend::replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err)
+{
+ Reply::UP reply(new EmptyReply());
+ reply->setContext(Context(new ReplyContext(*req, version)));
+ reply->getTrace().setLevel(traceLevel);
+ reply->addError(err);
+ handleReply(std::move(reply));
+}
+
+void
+RPCSend::handleDiscard(Context ctx)
+{
+ ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
+ FRT_RPCRequest &req = tmp->getRequest();
+ FNET_Channel *chn = req.GetContext()._value.CHANNEL;
+ req.SubRef();
+ chn->Free();
+}
+
+void
+RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining)
+{
+ send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
+}
+
+void
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining)
+{
+ send(recipient, version, FillByCopy(payload), timeRemaining);
+}
+
+void
+RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & payload, uint64_t timeRemaining)
+{
+ SendContext::UP ctx(new SendContext(recipient, timeRemaining));
+ RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
+ const Message &msg = recipient.getMessage();
+ Route route = recipient.getRoute();
+ Hop hop = route.removeHop(0);
+
+ FRT_RPCRequest *req = _net->allocRequest();
+ encodeRequest(*req, version, route, address, msg, recipient.getTrace().getLevel(), payload, timeRemaining);
+
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
+ version.toString().c_str(), _clientIdent.c_str(),
+ address.getServiceName().c_str(), ctx->getTimeout()));
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getFRTTarget().InvokeVoid(req);
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
+ }
+ Reply::UP reply(new EmptyReply());
+ reply->getTrace().swap(ctx->getTrace());
+ _net->getOwner().deliverReply(std::move(reply), recipient);
+ } else {
+ SendContext *ptr = ctx.release();
+ req->SetContext(FNET_Context(ptr));
+ address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
+ }
+}
+
+void
+RPCSend::RequestDone(FRT_RPCRequest *req)
+{
+ SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
+ const string &serviceName = static_cast<RPCServiceAddress&>(ctx->getRecipient().getServiceAddress()).getServiceName();
+ Reply::UP reply;
+ Error error;
+ Trace & trace = ctx->getTrace();
+ if (!req->CheckReturnTypes(getReturnSpec())) {
+ reply.reset(new EmptyReply());
+ switch (req->GetErrorCode()) {
+ case FRTE_RPC_TIMEOUT:
+ error = Error(ErrorCode::TIMEOUT,
+ make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
+ serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
+ break;
+ case FRTE_RPC_CONNECTION:
+ error = Error(ErrorCode::CONNECTION_ERROR,
+ make_string("A connection error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ break;
+ default:
+ error = Error(ErrorCode::NETWORK_ERROR,
+ make_string("A network error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ }
+ } else {
+ FRT_Values &ret = *req->GetReturn();
+ reply = createReply(ret, serviceName, error, trace.getRoot());
+ }
+ if (trace.shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ trace.trace(TraceLevel::SEND_RECEIVE,
+ make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
+ }
+ reply->getTrace().swap(trace);
+ if (error.getCode() != ErrorCode::NONE) {
+ reply->addError(error);
+ }
+ _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
+ req->SubRef();
+}
+
+std::unique_ptr<Reply>
+RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & version, BlobRef payload, Error & error) const
+{
+ Reply::UP reply;
+ IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
+ if (protocol != nullptr) {
+ Routable::UP routable = protocol->decode(version, payload);
+ if (routable) {
+ if (routable->isReply()) {
+ reply.reset(static_cast<Reply*>(routable.release()));
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR, "Payload decoded to a message when expecting a reply.");
+ }
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", protocolName.c_str()));
+ }
+
+ } else {
+ error = Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", protocolName.c_str(), _serverIdent.c_str()));
+ }
+ return reply;
+}
+
+void
+RPCSend::handleReply(Reply::UP reply)
+{
+ ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
+ FRT_RPCRequest &req = ctx->getRequest();
+ string version = ctx->getVersion().toString();
+ if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
+ version.c_str(), _serverIdent.c_str()));
+ }
+ Blob payload(0);
+ if (reply->getType() != 0) {
+ payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
+ if (payload.size() == 0) {
+ reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
+ }
+ }
+ FRT_Values &ret = *req.GetReturn();
+ createResponse(ret, version, *reply, std::move(payload));
+ req.Return();
+}
+
+void
+RPCSend::invoke(FRT_RPCRequest *req)
+{
+ req->Detach();
+ FRT_Values &args = *req->GetParams();
+
+ std::unique_ptr<Params> params = toParams(args);
+ IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
+ if (protocol == nullptr) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str())));
+ return;
+ }
+ Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
+ req->DiscardBlobs();
+ if ( ! routable ) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", params->getProtocol().c_str())));
+ return;
+ }
+ if (routable->isReply()) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::DECODE_ERROR, "Payload decoded to a reply when expecting a mesage."));
+ return;
+ }
+ Message::UP msg(static_cast<Message*>(routable.release()));
+ vespalib::stringref route = params->getRoute();
+ if (!route.empty()) {
+ msg->setRoute(Route::parse(route));
+ }
+ msg->setContext(Context(new ReplyContext(*req, params->getVersion())));
+ msg->pushHandler(*this, *this);
+ msg->setRetryEnabled(params->useRetry());
+ msg->setRetry(params->getRetries());
+ msg->setTimeReceivedNow();
+ msg->setTimeRemaining(params->getRemainingTime());
+ msg->getTrace().setLevel(params->getTraceLevel());
+ if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Message (type %d) received at %s for session '%s'.",
+ msg->getType(), _serverIdent.c_str(), string(params->getSession()).c_str()));
+ }
+ _net->getOwner().deliverMessage(std::move(msg), params->getSession());
+}
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
new file mode 100644
index 00000000000..c707b47f548
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -0,0 +1,95 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpcsendadapter.h"
+#include <vespa/messagebus/idiscardhandler.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/messagebus/common.h>
+#include <vespa/fnet/frt/invokable.h>
+#include <vespa/fnet/frt/invoker.h>
+
+class FRT_ReflectionBuilder;
+
+namespace vespalib::slime { class Cursor; }
+namespace vespalib { class Memory; }
+namespace vespalib { class TraceNode; }
+namespace mbus {
+
+class Error;
+class Route;
+class Message;
+class RPCServiceAddress;
+
+class PayLoadFiller
+{
+public:
+ virtual ~PayLoadFiller() { }
+ virtual void fill(FRT_Values & v) const = 0;
+ virtual void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const = 0;
+};
+
+class RPCSend : public RPCSendAdapter,
+ public FRT_Invokable,
+ public FRT_IRequestWait,
+ public IDiscardHandler,
+ public IReplyHandler
+{
+public:
+ class Params {
+ public:
+ virtual ~Params() {}
+ virtual vespalib::Version getVersion() const = 0;
+ virtual vespalib::stringref getProtocol() const = 0;
+ virtual uint32_t getTraceLevel() const = 0;
+ virtual bool useRetry() const = 0;
+ virtual uint32_t getRetries() const = 0;
+ virtual uint64_t getRemainingTime() const = 0;
+ virtual vespalib::stringref getRoute() const = 0;
+ virtual vespalib::stringref getSession() const = 0;
+ virtual BlobRef getPayload() const = 0;
+ };
+protected:
+ RPCNetwork *_net;
+ string _clientIdent;
+ string _serverIdent;
+
+ virtual void build(FRT_ReflectionBuilder & builder) = 0;
+ virtual std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const = 0;
+ virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const = 0;
+ virtual const char * getReturnSpec() const = 0;
+ virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0;
+ virtual std::unique_ptr<Params> toParams(const FRT_Values &param) const = 0;
+
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & filler, uint64_t timeRemaining);
+ std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version,
+ BlobRef payload, Error & error) const;
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The FRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ void replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err);
+public:
+ RPCSend();
+ ~RPCSend();
+
+ void invoke(FRT_RPCRequest *req);
+private:
+ void attach(RPCNetwork &net) final override;
+ void handleDiscard(Context ctx) final override;
+ void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
+ Blob payload, uint64_t timeRemaining) final override;
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ BlobRef payload, uint64_t timeRemaining) final override;
+ void RequestDone(FRT_RPCRequest *req) final override;
+ void handleReply(std::unique_ptr<Reply> reply) final override;
+};
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
new file mode 100644
index 00000000000..f5867e79856
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
@@ -0,0 +1,52 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/messagebus/trace.h>
+#include <vespa/messagebus/routing/routingnode.h>
+
+namespace mbus::network::internal {
+/**
+ * Implements a helper class to hold the necessary context to create a reply from
+ * an rpc return value. This object is held as the context of an FRT_RPCRequest.
+ */
+class SendContext {
+private:
+ mbus::RoutingNode &_recipient;
+ mbus::Trace _trace;
+ double _timeout;
+
+public:
+ typedef std::unique_ptr<SendContext> UP;
+ SendContext(const SendContext &) = delete;
+ SendContext & operator = (const SendContext &) = delete;
+ SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
+ : _recipient(recipient),
+ _trace(recipient.getTrace().getLevel()),
+ _timeout(timeRemaining * 0.001) { }
+ mbus::RoutingNode &getRecipient() { return _recipient; }
+ mbus::Trace &getTrace() { return _trace; }
+ double getTimeout() { return _timeout; }
+};
+
+/**
+ * Implements a helper class to hold the necessary context to send a reply as an
+ * rpc return value. This object is held in the callstack of the reply.
+ */
+class ReplyContext {
+private:
+ FRT_RPCRequest &_request;
+ vespalib::Version _version;
+
+public:
+ typedef std::unique_ptr<ReplyContext> UP;
+ ReplyContext(const ReplyContext &) = delete;
+ ReplyContext & operator = (const ReplyContext &) = delete;
+
+ ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
+ : _request(request), _version(version) { }
+ FRT_RPCRequest &getRequest() { return _request; }
+ const vespalib::Version &getVersion() { return _version; }
+};
+
+
+}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
index 4cf45207010..6b89a278b88 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
@@ -1,87 +1,40 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "rpcsendv1.h"
#include "rpcnetwork.h"
-#include <vespa/messagebus/routing/routingnode.h>
+#include "rpcserviceaddress.h"
#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/tracelevel.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/channel.h>
+#include <vespa/fnet/frt/reflection.h>
using vespalib::make_string;
-namespace {
-
-/**
- * Implements a helper class to hold the necessary context to create a reply from
- * an rpc return value. This object is held as the context of an FRT_RPCRequest.
- */
-class SendContext {
-private:
- mbus::RoutingNode &_recipient;
- mbus::Trace _trace;
- double _timeout;
-
-public:
- typedef std::unique_ptr<SendContext> UP;
- SendContext(const SendContext &) = delete;
- SendContext & operator = (const SendContext &) = delete;
- SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
- : _recipient(recipient),
- _trace(recipient.getTrace().getLevel()),
- _timeout(timeRemaining * 0.001) { }
- mbus::RoutingNode &getRecipient() { return _recipient; }
- mbus::Trace &getTrace() { return _trace; }
- double getTimeout() { return _timeout; }
-};
+namespace mbus {
-/**
- * Implements a helper class to hold the necessary context to send a reply as an
- * rpc return value. This object is held in the callstack of the reply.
- */
-class ReplyContext {
-private:
- FRT_RPCRequest &_request;
- vespalib::Version _version;
+namespace {
-public:
- typedef std::unique_ptr<ReplyContext> UP;
- ReplyContext(const ReplyContext &) = delete;
- ReplyContext & operator = (const ReplyContext &) = delete;
-
- ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
- : _request(request), _version(version) { }
- FRT_RPCRequest &getRequest() { return _request; }
- const vespalib::Version &getVersion() { return _version; }
-};
+const char *METHOD_NAME = "mbus.send1";
+const char *METHOD_PARAMS = "sssbilsxi";
+const char *METHOD_RETURN = "sdISSsxs";
}
-namespace mbus {
-
-const char *RPCSendV1::METHOD_NAME = "mbus.send1";
-const char *RPCSendV1::METHOD_PARAMS = "sssbilsxi";
-const char *RPCSendV1::METHOD_RETURN = "sdISSsxs";
-
-RPCSendV1::RPCSendV1() :
- _net(NULL),
- _clientIdent("client"),
- _serverIdent("server")
-{ }
+bool RPCSendV1::isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons)
+{
+ return (method == METHOD_NAME) &&
+ (request == METHOD_PARAMS) &&
+ (respons == METHOD_RETURN);
+}
-RPCSendV1::~RPCSendV1() {}
+const char *
+RPCSendV1::getReturnSpec() const {
+ return METHOD_RETURN;
+}
void
-RPCSendV1::attach(RPCNetwork &net)
+RPCSendV1::build(FRT_ReflectionBuilder & builder)
{
- _net = &net;
- const string &prefix = _net->getIdentity().getServicePrefix();
- if (!prefix.empty()) {
- _clientIdent = make_string("'%s'", prefix.c_str());
- _serverIdent = _clientIdent;
- }
-
- FRT_ReflectionBuilder builder(&_net->getSupervisor());
builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV1::invoke), this);
builder.MethodDesc("Send a message bus request and get a reply back.");
builder.ParamDesc("version", "The version of the message.");
@@ -103,59 +56,14 @@ RPCSendV1::attach(RPCNetwork &net)
builder.ReturnDesc("trace", "A string representation of the trace.");
}
-namespace {
-
-class FillByCopy : public PayLoadFiller
-{
-public:
- FillByCopy(BlobRef payload) : _payload(payload) { }
- void fill(FRT_Values & v) const override {
- v.AddData(_payload.data(), _payload.size());
- }
-private:
- BlobRef _payload;
-};
-
-class FillByHandover : public PayLoadFiller
-{
-public:
- FillByHandover(Blob payload) : _payload(std::move(payload)) { }
- void fill(FRT_Values & v) const override {
- v.AddData(std::move(_payload.payload()), _payload.size());
- }
-private:
- mutable Blob _payload;
-};
-
-}
-
void
-RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining)
+RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const
{
- send(recipient, version, FillByCopy(payload), timeRemaining);
-}
-
-void
-RPCSendV1::sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining)
-{
- send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
-}
-void
-RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & payload, uint64_t timeRemaining)
-{
- SendContext::UP ctx(new SendContext(recipient, timeRemaining));
- RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
- const Message &msg = recipient.getMessage();
- Route route = recipient.getRoute();
- Hop hop = route.removeHop(0);
-
- FRT_RPCRequest *req = _net->allocRequest();
- FRT_Values &args = *req->GetParams();
- req->SetMethodName(METHOD_NAME);
+ FRT_Values &args = *req.GetParams();
+ req.SetMethodName(METHOD_NAME);
args.AddString(version.toString().c_str());
args.AddString(route.toString().c_str());
args.AddString(address.getSessionName().c_str());
@@ -163,237 +71,103 @@ RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
args.AddInt32(msg.getRetry());
args.AddInt64(timeRemaining);
args.AddString(msg.getProtocol().c_str());
- payload.fill(args);
- args.AddInt32(recipient.getTrace().getLevel());
-
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
- version.toString().c_str(), _clientIdent.c_str(),
- address.getServiceName().c_str(), ctx->getTimeout()));
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getFRTTarget().InvokeVoid(req);
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
- }
- Reply::UP reply(new EmptyReply());
- reply->getTrace().swap(ctx->getTrace());
- _net->getOwner().deliverReply(std::move(reply), recipient);
- } else {
- SendContext *ptr = ctx.release();
- req->SetContext(FNET_Context(ptr));
- address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
- }
+ filler.fill(args);
+ args.AddInt32(traceLevel);
}
-void
-RPCSendV1::RequestDone(FRT_RPCRequest *req)
-{
- SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
- const string &serviceName = static_cast<RPCServiceAddress&>(
- ctx->getRecipient().getServiceAddress()).getServiceName();
- Reply::UP reply;
- Error error;
- if (!req->CheckReturnTypes(METHOD_RETURN)) {
- reply.reset(new EmptyReply());
- switch (req->GetErrorCode()) {
- case FRTE_RPC_TIMEOUT:
- error = Error(ErrorCode::TIMEOUT,
- make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
- serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
- break;
- case FRTE_RPC_CONNECTION:
- error = Error(ErrorCode::CONNECTION_ERROR,
- make_string("A connection error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- break;
- default:
- error = Error(ErrorCode::NETWORK_ERROR,
- make_string("A network error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- }
- } else {
- FRT_Values &ret = *req->GetReturn();
+namespace {
- vespalib::Version version = vespalib::Version(ret[0]._string._str);
- double retryDelay = ret[1]._double;
- uint32_t *errorCodes = ret[2]._int32_array._pt;
- uint32_t errorCodesLen = ret[2]._int32_array._len;
- FRT_StringValue *errorMessages = ret[3]._string_array._pt;
- uint32_t errorMessagesLen = ret[3]._string_array._len;
- FRT_StringValue *errorServices = ret[4]._string_array._pt;
- uint32_t errorServicesLen = ret[4]._string_array._len;
- const char *protocolName = ret[5]._string._str;
- const char *payload = ret[6]._data._buf;
- uint32_t payloadLen = ret[6]._data._len;
- const char *trace = ret[7]._string._str;
+class ParamsV1 : public RPCSend::Params
+{
+public:
+ ParamsV1(const FRT_Values &args) : _args(args) { }
- if (payloadLen > 0) {
- IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
- if (protocol != nullptr) {
- Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
- if (routable) {
- if (routable->isReply()) {
- reply.reset(static_cast<Reply*>(routable.release()));
- } else {
- error = Error(ErrorCode::DECODE_ERROR,
- "Payload decoded to a message when expecting a reply.");
- }
- } else {
- error = Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", protocolName));
- }
+ uint32_t getTraceLevel() const override { return _args[8]._intval32; }
+ bool useRetry() const override { return _args[3]._intval8 != 0; }
+ uint32_t getRetries() const override { return _args[4]._intval32; }
+ uint64_t getRemainingTime() const override { return _args[5]._intval64; }
- } else {
- error = Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str()));
- }
- }
- if ( ! reply ) {
- reply.reset(new EmptyReply());
- }
- reply->setRetryDelay(retryDelay);
- for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
- reply->addError(Error(errorCodes[i],
- errorMessages[i]._str,
- errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
- }
- ctx->getTrace().getRoot().addChild(TraceNode::decode(trace));
+ vespalib::Version getVersion() const override {
+ return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len));
+ }
+ vespalib::stringref getRoute() const override {
+ return vespalib::stringref(_args[1]._string._str, _args[1]._string._len);
+ }
+ vespalib::stringref getSession() const override {
+ return vespalib::stringref(_args[2]._string._str, _args[2]._string._len);
}
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
+ vespalib::stringref getProtocol() const override {
+ return vespalib::stringref(_args[6]._string._str, _args[6]._string._len);
}
- reply->getTrace().swap(ctx->getTrace());
- if (error.getCode() != ErrorCode::NONE) {
- reply->addError(error);
+ BlobRef getPayload() const override {
+ return BlobRef(_args[7]._data._buf, _args[7]._data._len);
}
- _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
- req->SubRef();
+private:
+ const FRT_Values & _args;
+};
+
}
-void
-RPCSendV1::invoke(FRT_RPCRequest *req)
+std::unique_ptr<RPCSend::Params>
+RPCSendV1::toParams(const FRT_Values &args) const
{
- req->Detach();
+ return std::make_unique<ParamsV1>(args);
+}
- FRT_Values &args = *req->GetParams();
- vespalib::Version version = vespalib::Version(args[0]._string._str);
- const char *route = args[1]._string._str;
- const char *session = args[2]._string._str;
- bool retryEnabled = args[3]._intval8 != 0;
- uint32_t retry = args[4]._intval32;
- uint64_t timeRemaining = args[5]._intval64;
- const char *protocolName = args[6]._string._str;
- const char *payload = args[7]._data._buf;
- uint32_t payloadLen = args[7]._data._len;
- uint32_t traceLevel = args[8]._intval32;
- IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
- if (protocol == nullptr) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str())));
- return;
- }
- Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
- req->DiscardBlobs();
- if ( ! routable ) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", protocolName)));
- return;
- }
- if (routable->isReply()) {
- replyError(req, version, traceLevel,
- Error(ErrorCode::DECODE_ERROR,
- "Payload decoded to a reply when expecting a mesage."));
- return;
+std::unique_ptr<Reply>
+RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const
+{
+ vespalib::Version version = vespalib::Version(ret[0]._string._str);
+ double retryDelay = ret[1]._double;
+ uint32_t *errorCodes = ret[2]._int32_array._pt;
+ uint32_t errorCodesLen = ret[2]._int32_array._len;
+ FRT_StringValue *errorMessages = ret[3]._string_array._pt;
+ uint32_t errorMessagesLen = ret[3]._string_array._len;
+ FRT_StringValue *errorServices = ret[4]._string_array._pt;
+ uint32_t errorServicesLen = ret[4]._string_array._len;
+ const char *protocolName = ret[5]._string._str;
+ BlobRef payload(ret[6]._data._buf, ret[6]._data._len);
+ const char *trace = ret[7]._string._str;
+
+ Reply::UP reply;
+ if (payload.size() > 0) {
+ reply = decode(protocolName, version, payload, error);
}
- Message::UP msg(static_cast<Message*>(routable.release()));
- if (strlen(route) > 0) {
- msg->setRoute(Route::parse(route));
+ if ( ! reply ) {
+ reply.reset(new EmptyReply());
}
- msg->setContext(Context(new ReplyContext(*req, version)));
- msg->pushHandler(*this, *this);
- msg->setRetryEnabled(retryEnabled);
- msg->setRetry(retry);
- msg->setTimeReceivedNow();
- msg->setTimeRemaining(timeRemaining);
- msg->getTrace().setLevel(traceLevel);
- if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Message (type %d) received at %s for session '%s'.",
- msg->getType(), _serverIdent.c_str(), session));
+ reply->setRetryDelay(retryDelay);
+ for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
+ reply->addError(Error(errorCodes[i], errorMessages[i]._str,
+ errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
}
- _net->getOwner().deliverMessage(std::move(msg), session);
+ rootTrace.addChild(TraceNode::decode(trace));
+ return reply;
}
void
-RPCSendV1::handleReply(Reply::UP reply)
-{
- ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
- FRT_RPCRequest &req = ctx->getRequest();
- string version = ctx->getVersion().toString();
- if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
- version.c_str(), _serverIdent.c_str()));
- }
- Blob payload(0);
- if (reply->getType() != 0) {
- payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
- if (payload.size() == 0) {
- reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
- }
- }
- FRT_Values &ret = *req.GetReturn();
+RPCSendV1::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const {
ret.AddString(version.c_str());
- ret.AddDouble(reply->getRetryDelay());
+ ret.AddDouble(reply.getRetryDelay());
- uint32_t errorCount = reply->getNumErrors();
+ uint32_t errorCount = reply.getNumErrors();
uint32_t *errorCodes = ret.AddInt32Array(errorCount);
FRT_StringValue *errorMessages = ret.AddStringArray(errorCount);
FRT_StringValue *errorServices = ret.AddStringArray(errorCount);
for (uint32_t i = 0; i < errorCount; ++i) {
- errorCodes[i] = reply->getError(i).getCode();
- ret.SetString(errorMessages + i,
- reply->getError(i).getMessage().c_str());
- ret.SetString(errorServices + i,
- reply->getError(i).getService().c_str());
+ errorCodes[i] = reply.getError(i).getCode();
+ ret.SetString(errorMessages + i, reply.getError(i).getMessage().c_str());
+ ret.SetString(errorServices + i, reply.getError(i).getService().c_str());
}
- ret.AddString(reply->getProtocol().c_str());
+ ret.AddString(reply.getProtocol().c_str());
ret.AddData(std::move(payload.payload()), payload.size());
- if (reply->getTrace().getLevel() > 0) {
- ret.AddString(reply->getTrace().getRoot().encode().c_str());
+ if (reply.getTrace().getLevel() > 0) {
+ ret.AddString(reply.getTrace().getRoot().encode().c_str());
} else {
ret.AddString("");
}
- req.Return();
-}
-
-void
-RPCSendV1::handleDiscard(Context ctx)
-{
- ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
- FRT_RPCRequest &req = tmp->getRequest();
- FNET_Channel *chn = req.GetContext()._value.CHANNEL;
- req.SubRef();
- chn->Free();
-}
-
-void
-RPCSendV1::replyError(FRT_RPCRequest *req, const vespalib::Version &version,
- uint32_t traceLevel, const Error &err)
-{
- Reply::UP reply(new EmptyReply());
- reply->setContext(Context(new ReplyContext(*req, version)));
- reply->getTrace().setLevel(traceLevel);
- reply->addError(err);
- handleReply(std::move(reply));
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
index b634c1ef1b8..37f23335309 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
@@ -1,77 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "rpcsendadapter.h"
-#include <vespa/messagebus/idiscardhandler.h>
-#include <vespa/messagebus/ireplyhandler.h>
-#include <vespa/fnet/frt/invokable.h>
-
+#include "rpcsend.h"
namespace mbus {
-class Error;
-
-class PayLoadFiller
-{
+class RPCSendV1 : public RPCSend {
public:
- virtual ~PayLoadFiller() { }
- virtual void fill(FRT_Values & v) const = 0;
-};
-
-/**
- * Implements the send adapter for method "mbus.send".
- */
-class RPCSendV1 : public RPCSendAdapter,
- public FRT_Invokable,
- public FRT_IRequestWait,
- public IDiscardHandler,
- public IReplyHandler {
+ static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons);
private:
- RPCNetwork *_net;
- string _clientIdent;
- string _serverIdent;
-
- /**
- * Send an error reply for a given request.
- *
- * @param request The FRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- void replyError(FRT_RPCRequest *req, const vespalib::Version &version,
- uint32_t traceLevel, const Error &err);
-
- void send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & filler, uint64_t timeRemaining);
-public:
- /** The name of the rpc method that this adapter registers. */
- static const char *METHOD_NAME;
-
- /** The parameter string of the rpc method. */
- static const char *METHOD_PARAMS;
-
- /** The return string of the rpc method. */
- static const char *METHOD_RETURN;
-
- /**
- * Constructs a new instance of this adapter. This object is unusable until
- * its attach() method has been called.
- */
- RPCSendV1();
- ~RPCSendV1();
-
- void attach(RPCNetwork &net) override;
-
- void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining) override;
- void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining) override;
-
- void handleReply(std::unique_ptr<Reply> reply) override;
- void handleDiscard(Context ctx) override;
- void invoke(FRT_RPCRequest *req);
- void RequestDone(FRT_RPCRequest *req) override;
+ void build(FRT_ReflectionBuilder & builder) override;
+ const char * getReturnSpec() const override;
+ std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
+ void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+
+ std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const override;
+ void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
new file mode 100644
index 00000000000..1228e08f3b4
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
@@ -0,0 +1,263 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "rpcsendv2.h"
+#include "rpcnetwork.h"
+#include "rpcserviceaddress.h"
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/tracelevel.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/vespalib/data/databuffer.h>
+#include <vespa/vespalib/util/compressor.h>
+#include <vespa/fnet/frt/reflection.h>
+
+using vespalib::make_string;
+using vespalib::compression::CompressionConfig;
+using vespalib::compression::decompress;
+using vespalib::compression::compress;
+using vespalib::DataBuffer;
+using vespalib::ConstBufferRef;
+using vespalib::stringref;
+using vespalib::Memory;
+using vespalib::Slime;
+using vespalib::Version;
+using namespace vespalib::slime;
+
+namespace mbus {
+
+namespace {
+
+const char *METHOD_NAME = "mbus.slime";
+const char *METHOD_PARAMS = "bixbix";
+const char *METHOD_RETURN = "bixbix";
+
+Memory VERSION_F("version");
+Memory ROUTE_F("route");
+Memory SESSION_F("session");
+Memory USERETRY_F("useretry");
+Memory RETRYDELAY_F("retrydelay");
+Memory RETRY_F("retry");
+Memory TIMELEFT_F("timeleft");
+Memory PROTOCOL_F("prot");
+Memory TRACELEVEL_F("tracelevel");
+Memory TRACE_F("trace");
+Memory BLOB_F("msg");
+Memory ERRORS_F("errors");
+Memory CODE_F("code");
+Memory MSG_F("msg");
+Memory SERVICE_F("service");
+
+}
+
+bool RPCSendV2::isCompatible(stringref method, stringref request, stringref response)
+{
+ return (method == METHOD_NAME) &&
+ (request == METHOD_PARAMS) &&
+ (response == METHOD_RETURN);
+}
+
+void
+RPCSendV2::build(FRT_ReflectionBuilder & builder)
+{
+ builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV2::invoke), this);
+ builder.MethodDesc("Send a message bus slime request and get a reply back.");
+ builder.ParamDesc("header_encoding", "0=raw, 6=lz4");
+ builder.ParamDesc("header_decoded_size", "Uncompressed header blob size");
+ builder.ParamDesc("header_payload", "The message header blob in slime");
+ builder.ParamDesc("body_encoding", "0=raw, 6=lz4");
+ builder.ParamDesc("body_decoded_size", "Uncompressed body blob size");
+ builder.ParamDesc("body_payload", "The message body blob in slime");
+ builder.ReturnDesc("header_encoding", "0=raw, 6=lz4");
+ builder.ReturnDesc("header_decoded_size", "Uncompressed header blob size");
+ builder.ReturnDesc("header_payload", "The reply header blob in slime.");
+ builder.ReturnDesc("body_encoding", "0=raw, 6=lz4");
+ builder.ReturnDesc("body_decoded_size", "Uncompressed body blob size");
+ builder.ReturnDesc("body_payload", "The reply body blob in slime.");
+}
+
+const char *
+RPCSendV2::getReturnSpec() const {
+ return METHOD_RETURN;
+}
+
+namespace {
+class OutputBuf : public vespalib::Output {
+public:
+ OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
+ DataBuffer & getBuf() { return _buf; }
+private:
+ vespalib::WritableMemory reserve(size_t bytes) override {
+ _buf.ensureFree(bytes);
+ return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
+ }
+ Output &commit(size_t bytes) override {
+ _buf.moveFreeToData(bytes);
+ return *this;
+ }
+ DataBuffer _buf;
+};
+}
+
+void
+RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const
+{
+ FRT_Values &args = *req.GetParams();
+ req.SetMethodName(METHOD_NAME);
+ // Place holder for auxillary data to be transfered later.
+ args.AddInt8(CompressionConfig::NONE);
+ args.AddInt32(0);
+ args.AddData("", 0);
+
+ Slime slime;
+ Cursor & root = slime.setObject();
+
+ root.setString(VERSION_F, version.toString());
+ root.setString(ROUTE_F, route.toString());
+ root.setString(SESSION_F, address.getSessionName());
+ root.setBool(USERETRY_F, msg.getRetryEnabled());
+ root.setLong(RETRY_F, msg.getRetry());
+ root.setLong(TIMELEFT_F, timeRemaining);
+ root.setString(PROTOCOL_F, msg.getProtocol());
+ root.setLong(TRACELEVEL_F, traceLevel);
+ filler.fill(BLOB_F, root);
+
+ OutputBuf rBuf(8192);
+ BinaryFormat::encode(slime, rBuf);
+ ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
+ DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
+ CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
+
+ args.AddInt8(type);
+ args.AddInt32(toCompress.size());
+ args.AddData(buf.stealBuffer(), buf.getDataLen());
+}
+
+namespace {
+
+class ParamsV2 : public RPCSend::Params
+{
+public:
+ ParamsV2(const FRT_Values &arg)
+ : _slime()
+ {
+ uint8_t encoding = arg[3]._intval8;
+ uint32_t uncompressedSize = arg[4]._intval32;
+ DataBuffer uncompressed(arg[5]._data._buf, arg[5]._data._len);
+ ConstBufferRef blob(arg[5]._data._buf, arg[5]._data._len);
+ decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
+ assert(uncompressedSize == uncompressed.getDataLen());
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), _slime);
+ }
+
+ uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); }
+ bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); }
+ uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); }
+ uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); }
+
+ Version getVersion() const override {
+ return Version(_slime.get()[VERSION_F].asString().make_stringref());
+ }
+ stringref getRoute() const override {
+ return _slime.get()[ROUTE_F].asString().make_stringref();
+ }
+ stringref getSession() const override {
+ return _slime.get()[SESSION_F].asString().make_stringref();
+ }
+ stringref getProtocol() const override {
+ return _slime.get()[PROTOCOL_F].asString().make_stringref();
+ }
+ BlobRef getPayload() const override {
+ Memory m = _slime.get()[BLOB_F].asData();
+ return BlobRef(m.data, m.size);
+ }
+private:
+ Slime _slime;
+};
+
+}
+
+std::unique_ptr<RPCSend::Params>
+RPCSendV2::toParams(const FRT_Values &args) const
+{
+ return std::make_unique<ParamsV2>(args);
+}
+
+std::unique_ptr<Reply>
+RPCSendV2::createReply(const FRT_Values & ret, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const
+{
+ uint8_t encoding = ret[3]._intval8;
+ uint32_t uncompressedSize = ret[4]._intval32;
+ DataBuffer uncompressed(ret[5]._data._buf, ret[5]._data._len);
+ ConstBufferRef blob(ret[5]._data._buf, ret[5]._data._len);
+ decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
+ assert(uncompressedSize == uncompressed.getDataLen());
+ Slime slime;
+ BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
+ Inspector & root = slime.get();
+ Version version(root[VERSION_F].asString().make_string());
+ Memory payload = root[BLOB_F].asData();
+
+ Reply::UP reply;
+ if (payload.size > 0) {
+ reply = decode(root[PROTOCOL_F].asString().make_stringref(), version, BlobRef(payload.data, payload.size), error);
+ }
+ if ( ! reply ) {
+ reply.reset(new EmptyReply());
+ }
+ reply->setRetryDelay(root[RETRYDELAY_F].asDouble());
+ Inspector & errors = root[ERRORS_F];
+ for (uint32_t i = 0; i < errors.entries(); ++i) {
+ Inspector & e = errors[i];
+ Memory service = e[SERVICE_F].asString();
+ reply->addError(Error(e[CODE_F].asLong(), e[MSG_F].asString().make_string(),
+ (service.size > 0) ? service.make_string() : serviceName));
+ }
+ rootTrace.addChild(TraceNode::decode(root[TRACE_F].asString().make_string()));
+ return reply;
+}
+
+void
+RPCSendV2::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const
+{
+ // Place holder for auxillary data to be transfered later.
+ ret.AddInt8(CompressionConfig::NONE);
+ ret.AddInt32(0);
+ ret.AddData("", 0);
+
+ Slime slime;
+ Cursor & root = slime.setObject();
+
+ root.setString(VERSION_F, version);
+ root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
+ root.setString(PROTOCOL_F, reply.getProtocol());
+ root.setData(BLOB_F, vespalib::Memory(payload.data(), payload.size()));
+ if (reply.getTrace().getLevel() > 0) {
+ root.setString(TRACE_F, reply.getTrace().getRoot().encode());
+ }
+
+ if (reply.getNumErrors() > 0) {
+ Cursor & array = root.setArray(ERRORS_F);
+ for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
+ Cursor & error = array.addObject();
+ error.setLong(CODE_F, reply.getError(i).getCode());
+ error.setString(MSG_F, reply.getError(i).getMessage());
+ error.setString(SERVICE_F, reply.getError(i).getService().c_str());
+ }
+ }
+
+ OutputBuf rBuf(8192);
+ BinaryFormat::encode(slime, rBuf);
+ ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
+ DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
+ CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
+
+ ret.AddInt8(type);
+ ret.AddInt32(toCompress.size());
+ ret.AddData(buf.stealBuffer(), buf.getDataLen());
+
+}
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
new file mode 100644
index 00000000000..e793868d2aa
--- /dev/null
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
@@ -0,0 +1,24 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "rpcsend.h"
+
+namespace mbus {
+
+class RPCSendV2 : public RPCSend {
+public:
+ static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref response);
+private:
+ void build(FRT_ReflectionBuilder & builder) override;
+ const char * getReturnSpec() const override;
+ std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
+ void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
+ const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
+ const PayLoadFiller &filler, uint64_t timeRemaining) const override;
+
+ std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
+ Error & error, vespalib::TraceNode & rootTrace) const override;
+ void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
+};
+
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
index c6856057342..36dde19bd18 100644
--- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
+++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
@@ -84,7 +84,7 @@ public:
*
* @return True if target is set.
*/
- bool hasTarget() const { return _target.get() != NULL; }
+ bool hasTarget() const { return _target.get() != nullptr; }
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index 20e5a2eb3e3..295814f4a8d 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -36,7 +36,7 @@ RPCTargetPool::flushTargets(bool force)
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
Entry &entry = it->second;
- if (entry._target.get() != NULL) {
+ if (entry._target.get() != nullptr) {
if (entry._target.use_count() > 1) {
entry._lastUse = currentTime;
++it;
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index 683982de080..5f858f66993 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -1,11 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <map>
-#include <vespa/messagebus/itimer.h>
-#include <vespa/vespalib/util/sync.h>
#include "rpcserviceaddress.h"
#include "rpctarget.h"
+#include <vespa/messagebus/itimer.h>
+#include <vespa/vespalib/util/sync.h>
+#include <map>
class FRT_Supervisor;
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp
index d2661e3ef80..4e2efcfb3b9 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.cpp
+++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp
@@ -76,7 +76,7 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName,
} catch (const std::exception &e) {
LOG(error, "Protocol '%s' threw an exception; %s", protocolName.c_str(), e.what());
}
- if (policy.get() == NULL) {
+ if (policy.get() == nullptr) {
LOG(error, "Protocol '%s' failed to create routing policy '%s' with parameter '%s'.",
protocolName.c_str(), policyName.c_str(), policyParam.c_str());
return IRoutingPolicy::SP();
diff --git a/messagebus/src/vespa/messagebus/routing/route.cpp b/messagebus/src/vespa/messagebus/routing/route.cpp
index af7b5113ac2..b705847c3a5 100644
--- a/messagebus/src/vespa/messagebus/routing/route.cpp
+++ b/messagebus/src/vespa/messagebus/routing/route.cpp
@@ -69,7 +69,7 @@ Route::toDebugString() const {
}
Route
-Route::parse(const string &route)
+Route::parse(vespalib::stringref route)
{
return RouteParser::createRoute(route);
}
diff --git a/messagebus/src/vespa/messagebus/routing/route.h b/messagebus/src/vespa/messagebus/routing/route.h
index d9932e17d26..a2a01648cfe 100644
--- a/messagebus/src/vespa/messagebus/routing/route.h
+++ b/messagebus/src/vespa/messagebus/routing/route.h
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vector>
#include "hop.h"
namespace mbus {
@@ -33,7 +32,7 @@ public:
* @param route The string to parse.
* @return A route that corresponds to the string.
*/
- static Route parse(const string &route);
+ static Route parse(vespalib::stringref route);
/**
* Create a Route that contains no hops
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.cpp b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
index ac52ae3b598..668f14f9801 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
@@ -71,7 +71,7 @@ RouteParser::createDirective(const stringref &str)
}
Hop
-RouteParser::createHop(const stringref &str)
+RouteParser::createHop(stringref str)
{
if (str.empty()) {
return Hop().addDirective(createErrorDirective("Failed to parse empty string."));
@@ -84,7 +84,7 @@ RouteParser::createHop(const stringref &str)
}
if (len > 4 && str.substr(0, 4) == "tcp/") {
IHopDirective::SP tcp = createTcpDirective(str.substr(4));
- if (tcp.get() != NULL) {
+ if (tcp.get() != nullptr) {
return Hop().addDirective(tcp);
}
}
@@ -119,7 +119,7 @@ RouteParser::createHop(const stringref &str)
}
Route
-RouteParser::createRoute(const stringref &str)
+RouteParser::createRoute(stringref str)
{
Route ret;
for (size_t from = 0, at = 0, depth = 0; at <= str.size(); ++at) {
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.h b/messagebus/src/vespa/messagebus/routing/routeparser.h
index a3f16e49307..8ffba3f6e11 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.h
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.h
@@ -29,7 +29,7 @@ public:
* @param str The string to parse as a hop.
* @return The created hop.
*/
- static Hop createHop(const vespalib::stringref &str);
+ static Hop createHop(vespalib::stringref str);
/**
* Creates a route from a string representation.
@@ -37,7 +37,7 @@ public:
* @param str The string to parse as a route.
* @return The created route.
*/
- static Route createRoute(const vespalib::stringref &str);
+ static Route createRoute(vespalib::stringref str);
};
} // mbus
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
index 62efda4aeb9..6e100999e1d 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
@@ -21,7 +21,7 @@ RoutingNode::RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender,
: _mbus(mbus),
_net(net),
_resender(resender),
- _parent(NULL),
+ _parent(nullptr),
_recipients(),
_children(),
_replyHandler(&replyHandler),
@@ -45,8 +45,8 @@ RoutingNode::RoutingNode(RoutingNode &parent, const Route &route)
_parent(&parent),
_recipients(parent._recipients),
_children(),
- _replyHandler(NULL),
- _discardHandler(NULL),
+ _replyHandler(nullptr),
+ _discardHandler(nullptr),
_trace(parent._trace.getLevel()),
_pending(0),
_msg(parent._msg),
@@ -78,8 +78,8 @@ RoutingNode::clearChildren()
void
RoutingNode::discard()
{
- assert(_parent == NULL);
- if (_discardHandler != NULL) {
+ assert(_parent == nullptr);
+ if (_discardHandler != nullptr) {
_discardHandler->handleDiscard(Context());
}
}
@@ -101,7 +101,7 @@ RoutingNode::prepareForRetry()
{
_shouldRetry = false;
_reply.reset();
- if (_routingContext.get() != NULL && _routingContext->getSelectOnRetry()) {
+ if (_routingContext.get() != nullptr && _routingContext->getSelectOnRetry()) {
clearChildren();
} else if (!_children.empty()) {
bool retryingSome = false;
@@ -109,7 +109,7 @@ RoutingNode::prepareForRetry()
it != _children.end(); ++it)
{
RoutingNode *child= *it;
- if (child->_shouldRetry || child->_reply.get() == NULL) {
+ if (child->_shouldRetry || child->_reply.get() == nullptr) {
child->prepareForRetry();
retryingSome = true;
}
@@ -126,11 +126,11 @@ RoutingNode::prepareForRetry()
void
RoutingNode::notifyParent()
{
- if (_serviceAddress.get() != NULL) {
+ if (_serviceAddress.get() != nullptr) {
_net.freeServiceAddress(*this);
}
tryIgnoreResult();
- if (_parent != NULL) {
+ if (_parent != nullptr) {
_parent->notifyMerge();
return;
}
@@ -174,7 +174,7 @@ RoutingNode::addError(uint32_t code, const string &msg)
void
RoutingNode::addError(const Error &err)
{
- if (_reply.get() != NULL) {
+ if (_reply.get() != nullptr) {
_reply->getTrace().swap(_trace);
_reply->addError(err);
_reply->getTrace().swap(_trace);
@@ -186,8 +186,8 @@ RoutingNode::addError(const Error &err)
void
RoutingNode::setReply(Reply::UP reply)
{
- if (reply.get() != NULL) {
- _shouldRetry = _resender != NULL && _resender->shouldRetry(*reply);
+ if (reply.get() != nullptr) {
+ _shouldRetry = _resender != nullptr && _resender->shouldRetry(*reply);
_trace.getRoot().addChild(reply->getTrace().getRoot());
reply->getTrace().clear();
}
@@ -211,7 +211,7 @@ RoutingNode::notifyAbort(const string &msg)
mystack.pop();
if (!node->_isActive) {
// reply not pending
- } else if (node->_reply.get() != NULL) {
+ } else if (node->_reply.get() != nullptr) {
node->notifyParent();
} else if (node->_children.empty()) {
node->setError(ErrorCode::SEND_ABORTED, msg);
@@ -240,7 +240,7 @@ RoutingNode::notifyTransmit()
if (node->hasReply()) {
node->notifyParent();
} else {
- assert(node->_serviceAddress.get() != NULL);
+ assert(node->_serviceAddress.get() != nullptr);
sendTo.push_back(node);
}
} else {
@@ -296,7 +296,7 @@ RoutingNode::notifyMerge()
setError(ErrorCode::POLICY_ERROR, make_string("Policy '%s' threw an exception; %s",
dir.getName().c_str(), e.what()));
}
- if (_reply.get() == NULL) {
+ if (_reply.get() == nullptr) {
setError(ErrorCode::APP_FATAL_ERROR, make_string("Routing policy '%s' failed to merge replies.",
dir.getName().c_str()));
}
@@ -315,12 +315,12 @@ RoutingNode::hasUnconsumedErrors()
while (!mystack.empty()) {
RoutingNode *node = mystack.top();
mystack.pop();
- if (node->_reply.get() != NULL) {
+ if (node->_reply.get() != nullptr) {
for (uint32_t i = 0; i < node->_reply->getNumErrors(); ++i) {
int errorCode = node->_reply->getError(i).getCode();
RoutingNode *it = node;
- while (it != NULL) {
- if (it->_routingContext.get() != NULL &&
+ while (it != nullptr) {
+ if (it->_routingContext.get() != nullptr &&
it->_routingContext->isConsumableError(errorCode))
{
errorCode = ErrorCode::NONE;
@@ -329,7 +329,7 @@ RoutingNode::hasUnconsumedErrors()
it = it->_parent;
}
if (errorCode != ErrorCode::NONE) {
- _shouldRetry = _resender != NULL && _resender->canRetry(errorCode);
+ _shouldRetry = _resender != nullptr && _resender->canRetry(errorCode);
if (!_shouldRetry) {
return true; // no need to continue
}
@@ -374,17 +374,17 @@ RoutingNode::resolve(uint32_t depth)
if (executePolicySelect()) {
return resolveChildren(depth + 1);
}
- return _reply.get() != NULL;
+ return _reply.get() != nullptr;
}
_net.allocServiceAddress(*this);
- return _serviceAddress.get() != NULL || _reply.get() != NULL;
+ return _serviceAddress.get() != nullptr || _reply.get() != nullptr;
}
bool
RoutingNode::lookupHop()
{
RoutingTable::SP table = _mbus.getRoutingTable(_msg.getProtocol());
- if (table.get() != NULL) {
+ if (table.get() != nullptr) {
string name = _route.getHop(0).getServiceName();
if (table->hasHop(name)) {
const HopBlueprint *hop = table->getHop(name);
@@ -404,7 +404,7 @@ RoutingNode::lookupRoute()
Hop &hop = _route.getHop(0);
if (hop.getDirective(0)->getType() == IHopDirective::TYPE_ROUTE) {
RouteDirective &dir = static_cast<RouteDirective&>(*hop.getDirective(0));
- if (table.get() == NULL || !table->hasRoute(dir.getName())) {
+ if (table.get() == nullptr || !table->hasRoute(dir.getName())) {
setError(ErrorCode::ILLEGAL_ROUTE,
make_string("Route '%s' does not exist.", dir.getName().c_str()));
return false;
@@ -415,7 +415,7 @@ RoutingNode::lookupRoute()
dir.getName().c_str(), _route.toString().c_str()));
return true;
}
- if (table.get() != NULL) {
+ if (table.get() != nullptr) {
string name = hop.getServiceName();
if (table->hasRoute(name)) {
insertRoute(*table->getRoute(name));
@@ -474,7 +474,7 @@ RoutingNode::executePolicySelect()
{
const PolicyDirective &dir = _routingContext->getDirective();
_policy = _mbus.getRoutingPolicy(_msg.getProtocol(), dir.getName(), dir.getParam());
- if (_policy.get() == NULL) {
+ if (_policy.get() == nullptr) {
setError(ErrorCode::UNKNOWN_POLICY, make_string(
"Protocol '%s' could not create routing policy '%s' with parameter '%s'.",
_msg.getProtocol().c_str(), dir.getName().c_str(), dir.getParam().c_str()));
@@ -489,7 +489,7 @@ RoutingNode::executePolicySelect()
return false;
}
if (_children.empty()) {
- if (_reply.get() == NULL) {
+ if (_reply.get() == nullptr) {
setError(ErrorCode::NO_SERVICES_FOR_ROUTE,
make_string("Policy '%s' selected no recipients for route '%s'.",
dir.getName().c_str(), _route.toString().c_str()));
@@ -522,7 +522,7 @@ RoutingNode::resolveChildren(uint32_t childDepth)
RoutingNode *child = *it;
child->_trace.trace(TraceLevel::SPLIT_MERGE,
make_string("Resolving '%s'.", child->_route.toString().c_str()));
- child->_isActive = (child->_reply.get() == NULL);
+ child->_isActive = (child->_reply.get() == nullptr);
if (child->_isActive) {
++numActiveChildren;
if (!child->resolve(childDepth)) {
@@ -562,7 +562,7 @@ RoutingNode::tryIgnoreResult()
if (!shouldIgnoreResult()) {
return false;
}
- if (_reply.get() == NULL || !_reply->hasErrors()) {
+ if (_reply.get() == nullptr || !_reply->hasErrors()) {
return false;
}
setReply(Reply::UP(new EmptyReply()));
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h
index 22ff07c26e5..8951785c621 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.h
@@ -228,7 +228,7 @@ public:
*/
RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender,
IReplyHandler &replyHandler, Message &msg,
- IDiscardHandler *discardHandler = NULL);
+ IDiscardHandler *discardHandler = nullptr);
/**
* Destructor. Frees up any allocated resources, namely all child nodes of
@@ -348,6 +348,7 @@ public:
* @return The message being routed.
*/
Message &getMessage() { return _msg; }
+ const Message & getMessage() const { return _msg; }
/**
* Returns the trace object for this node. Each node has a separate trace
@@ -356,6 +357,7 @@ public:
* @return The trace object.
*/
Trace &getTrace() { return _trace; }
+ const Trace &getTrace() const { return _trace; }
/**
* Returns the route object as it exists at this point of the tree.
@@ -369,7 +371,7 @@ public:
*
* @return True if this node has a reply.
*/
- bool hasReply() const { return _reply.get() != NULL; }
+ bool hasReply() const { return _reply.get() != nullptr; }
/**
* Returns the reply of this node.
@@ -419,7 +421,7 @@ public:
*
* @return True if an address is set.
*/
- bool hasServiceAddress() { return _serviceAddress.get() != NULL; }
+ bool hasServiceAddress() { return _serviceAddress.get() != nullptr; }
/**
* Returns the service address of this node. This is attached by the network
@@ -429,6 +431,7 @@ public:
* @return The recipient address.
*/
IServiceAddress &getServiceAddress() { return *_serviceAddress; }
+ const IServiceAddress &getServiceAddress() const { return *_serviceAddress; }
/**
* Sets the service address of this node. This is called by the network
diff --git a/messagebus/src/vespa/messagebus/routing/routingtable.cpp b/messagebus/src/vespa/messagebus/routing/routingtable.cpp
index 7537605d9fa..58e1881dc90 100644
--- a/messagebus/src/vespa/messagebus/routing/routingtable.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingtable.cpp
@@ -54,7 +54,7 @@ const HopBlueprint *
RoutingTable::getHop(const string &name) const
{
std::map<string, HopBlueprint>::const_iterator it = _hops.find(name);
- return it != _hops.end() ? &(it->second) : NULL;
+ return it != _hops.end() ? &(it->second) : nullptr;
}
bool
@@ -67,7 +67,7 @@ const Route *
RoutingTable::getRoute(const string &name) const
{
std::map<string, Route>::const_iterator it = _routes.find(name);
- return it != _routes.end() ? &(it->second) : NULL;
+ return it != _routes.end() ? &(it->second) : nullptr;
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
index c093efd1106..103c21ee3aa 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpcmessagebus.h"
+#include <vespa/config/subscription/configuri.h>
namespace mbus {
@@ -15,6 +16,10 @@ RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams,
_subscriber.start();
}
+RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams, const RPCNetworkParams &rpcParams)
+ : RPCMessageBus(mbusParams, rpcParams, config::ConfigUri("client"))
+{}
+
RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols,
const RPCNetworkParams &rpcParams,
const config::ConfigUri &routingCfgUri) :
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.h b/messagebus/src/vespa/messagebus/rpcmessagebus.h
index 3f249965fec..e9cfed32c44 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.h
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.h
@@ -7,6 +7,8 @@
#include <vespa/messagebus/network/rpcnetwork.h>
#include <vespa/config/helper/legacysubscriber.h>
+namespace config {class ConfigUri; }
+
namespace mbus {
/**
@@ -40,8 +42,10 @@ public:
* @param routingCfgId The config id for message bus routing specs.
*/
RPCMessageBus(const MessageBusParams &mbusParams,
- const RPCNetworkParams &rpcParams = RPCNetworkParams(),
- const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
+ const RPCNetworkParams &rpcParams,
+ const config::ConfigUri & routingCfgId);
+ RPCMessageBus(const MessageBusParams &mbusParams,
+ const RPCNetworkParams &rpcParams);
/**
@@ -55,8 +59,8 @@ public:
* @param routingCfgId The config id for messagebus routing specs.
*/
RPCMessageBus(const ProtocolSet &protocols,
- const RPCNetworkParams &rpcParams = RPCNetworkParams(),
- const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
+ const RPCNetworkParams &rpcParams,
+ const config::ConfigUri & routingCfgId);
/**
* Destruct. This will destruct the internal MessageBus and RPCNetwork
diff --git a/messagebus/src/vespa/messagebus/sequencer.cpp b/messagebus/src/vespa/messagebus/sequencer.cpp
index 79fbd346c16..60fb3bdd39e 100644
--- a/messagebus/src/vespa/messagebus/sequencer.cpp
+++ b/messagebus/src/vespa/messagebus/sequencer.cpp
@@ -19,7 +19,7 @@ Sequencer::~Sequencer()
{
for (QueueMap::iterator it = _seqMap.begin(); it != _seqMap.end(); ++it) {
MessageQueue *queue = it->second;
- if (queue != NULL) {
+ if (queue != nullptr) {
while (queue->size() > 0) {
Message *msg = queue->front();
queue->pop();
@@ -40,7 +40,7 @@ Sequencer::filter(Message::UP msg)
vespalib::LockGuard guard(_lock);
QueueMap::iterator it = _seqMap.find(seqId);
if (it != _seqMap.end()) {
- if (it->second == NULL) {
+ if (it->second == nullptr) {
it->second = new MessageQueue();
}
msg->getTrace().trace(TraceLevel::COMPONENT,
@@ -49,7 +49,7 @@ Sequencer::filter(Message::UP msg)
msg.release();
return Message::UP();
}
- _seqMap[seqId] = NULL; // insert empty queue
+ _seqMap[seqId] = nullptr; // insert empty queue
}
return std::move(msg);
}
@@ -69,7 +69,7 @@ Sequencer::handleMessage(Message::UP msg)
{
if (msg->hasSequenceId()) {
msg = filter(std::move(msg));
- if (msg.get() != NULL) {
+ if (msg.get() != nullptr) {
sequencedSend(std::move(msg));
}
} else {
@@ -89,8 +89,8 @@ Sequencer::handleReply(Reply::UP reply)
QueueMap::iterator it = _seqMap.find(seq);
MessageQueue *que = it->second;
assert(it != _seqMap.end());
- if (que == NULL || que->size() == 0) {
- if (que != NULL) {
+ if (que == nullptr || que->size() == 0) {
+ if (que != nullptr) {
delete que;
}
_seqMap.erase(it);
@@ -99,7 +99,7 @@ Sequencer::handleReply(Reply::UP reply)
que->pop();
}
}
- if (msg.get() != NULL) {
+ if (msg.get() != nullptr) {
sequencedSend(std::move(msg));
}
IReplyHandler &handler = reply->getCallStack().pop(*reply);
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 9a93a4aedf1..1dbdd307e17 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -41,9 +41,9 @@ SourceSession::send(Message::UP msg, const string &routeName, bool parseIfNotFou
{
bool found = false;
RoutingTable::SP rt = _mbus.getRoutingTable(msg->getProtocol());
- if (rt.get() != NULL) {
+ if (rt.get() != nullptr) {
const Route *route = rt->getRoute(routeName);
- if (route != NULL) {
+ if (route != nullptr) {
msg->setRoute(*route);
found = true;
} else if (!parseIfNotFound) {
@@ -79,13 +79,13 @@ SourceSession::send(Message::UP msg)
if (_closed) {
return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg));
}
- if (_throttlePolicy.get() != NULL && !_throttlePolicy->canSend(*msg, _pendingCount)) {
+ if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) {
return Result(Error(ErrorCode::SEND_QUEUE_FULL,
make_string("Too much pending data (%d messages).", _pendingCount)),
std::move(msg));
}
msg->pushHandler(_replyHandler);
- if (_throttlePolicy.get() != NULL) {
+ if (_throttlePolicy.get() != nullptr) {
_throttlePolicy->processMessage(*msg);
}
++_pendingCount;
@@ -108,7 +108,7 @@ SourceSession::handleReply(Reply::UP reply)
vespalib::MonitorGuard guard(_monitor);
assert(_pendingCount > 0);
--_pendingCount;
- if (_throttlePolicy.get() != NULL) {
+ if (_throttlePolicy.get() != nullptr) {
_throttlePolicy->processReply(*reply);
}
done = (_closed && _pendingCount == 0);
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
index 125b2a9822f..51fe91562ae 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
@@ -6,7 +6,7 @@
namespace mbus {
SourceSessionParams::SourceSessionParams() :
- _replyHandler(NULL),
+ _replyHandler(nullptr),
_throttlePolicy(new DynamicThrottlePolicy()),
_timeout(180.0)
{ }
@@ -40,7 +40,7 @@ SourceSessionParams::setTimeout(double timeout)
bool
SourceSessionParams::hasReplyHandler() const
{
- return _replyHandler != NULL;
+ return _replyHandler != nullptr;
}
IReplyHandler &
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index e7f3646c72c..dbc741f2dd4 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -4,6 +4,7 @@
#include "simpleprotocol.h"
#include "slobrok.h"
#include "slobrokstate.h"
+#include <vespa/messagebus/network/oosmanager.h>
#include <vespa/vespalib/component/vtag.h>
namespace mbus {
@@ -11,9 +12,7 @@ namespace mbus {
VersionedRPCNetwork::VersionedRPCNetwork(const RPCNetworkParams &params) :
RPCNetwork(params),
_version(vespalib::Vtag::currentVersion)
-{
- // empty
-}
+{}
void
VersionedRPCNetwork::setVersion(const vespalib::Version &version)
@@ -97,4 +96,4 @@ TestServer::waitState(const OOSState &oosState)
return false;
}
-} // namespace mbus
+}
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h
index 1e2de3c4607..400e2b274c5 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.h
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.h
@@ -2,9 +2,10 @@
#pragma once
-#include <memory>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/network/rpcnetwork.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/fnet/frt/supervisor.h>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h
index 95de9e70f55..f582a70a151 100644
--- a/messagebus/src/vespa/messagebus/tracenode.h
+++ b/messagebus/src/vespa/messagebus/tracenode.h
@@ -5,7 +5,7 @@
namespace mbus {
- typedef vespalib::TraceNode TraceNode;
+ using TraceNode = vespalib::TraceNode;
} // namespace mbus