summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-19 20:00:05 +0200
committerGitHub <noreply@github.com>2017-09-19 20:00:05 +0200
commit253cfe20e7450bc087506fd3d02a76140df5f9aa (patch)
tree711f9efc77af230526f7c266ad773243cbb7d602 /messagebus
parent5d92db079b6faf80cc2dcfb150889d452c3ac265 (diff)
Revert "- Use C++11 for loops."
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java54
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java269
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java332
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java209
-rw-r--r--messagebus/src/tests/advancedrouting/advancedrouting.cpp12
-rw-r--r--messagebus/src/tests/bucketsequence/bucketsequence.cpp1
-rw-r--r--messagebus/src/tests/choke/choke.cpp1
-rw-r--r--messagebus/src/tests/error/error.cpp4
-rw-r--r--messagebus/src/tests/routing/routing.cpp1
-rw-r--r--messagebus/src/tests/sendadapter/sendadapter.cpp1
-rw-r--r--messagebus/src/tests/serviceaddress/serviceaddress.cpp13
-rw-r--r--messagebus/src/tests/servicepool/servicepool.cpp2
-rw-r--r--messagebus/src/tests/shutdown/shutdown.cpp3
-rw-r--r--messagebus/src/tests/slobrok/slobrok.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/blob.h5
-rw-r--r--messagebus/src/vespa/messagebus/callstack.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/callstack.h2
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/network/CMakeLists.txt2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/oosmanager.h13
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp155
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h77
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h21
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp282
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h95
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend_private.h52
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.cpp406
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv1.h79
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp263
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h24
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcserviceaddress.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.h6
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/routing/route.h3
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/routing/routeparser.h4
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.cpp56
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.h9
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingtable.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/rpcmessagebus.h12
-rw-r--r--messagebus/src/vespa/messagebus/sequencer.cpp14
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/sourcesessionparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/testlib/testserver.h3
-rw-r--r--messagebus/src/vespa/messagebus/tracenode.h2
53 files changed, 900 insertions, 1664 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index d42e396452a..096c0c0b485 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -2,28 +2,17 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
+import com.yahoo.component.VersionSpecification;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.jrt.Acceptor;
-import com.yahoo.jrt.ListenFailedException;
-import com.yahoo.jrt.Method;
-import com.yahoo.jrt.MethodHandler;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.Spec;
-import com.yahoo.jrt.StringValue;
-import com.yahoo.jrt.Supervisor;
-import com.yahoo.jrt.Task;
-import com.yahoo.jrt.Transport;
+import com.yahoo.jrt.*;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Protocol;
-import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
@@ -33,16 +22,8 @@ import com.yahoo.messagebus.routing.RoutingNode;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -63,7 +44,7 @@ public class RPCNetwork implements Network, MethodHandler {
private final Acceptor listener;
private final Mirror mirror;
private final Register register;
- private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>();
+ private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>();
private NetworkOwner owner;
private final SlobrokConfigSubscriber slobroksConfig;
private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true);
@@ -181,10 +162,9 @@ public class RPCNetwork implements Network, MethodHandler {
}
this.owner = owner;
- RPCSendAdapter adapter1 = new RPCSendV1();
- RPCSendAdapter adapter2 = new RPCSendV2();
- addSendAdapter(new Version(5), adapter1);
- addSendAdapter(new Version(6,142), adapter2);
+ RPCSendAdapter adapter = new RPCSendV1();
+ addSendAdapter(new VersionSpecification(5), adapter);
+ addSendAdapter(new VersionSpecification(6), adapter);
}
@Override
@@ -254,9 +234,11 @@ public class RPCNetwork implements Network, MethodHandler {
*/
private void send(SendContext ctx) {
if (destroyed.get()) {
- replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
+ replyError(ctx, ErrorCode.NETWORK_SHUTDOWN,
+ "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
+ "An error occured while resolving version.");
} else {
sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -333,7 +315,7 @@ public class RPCNetwork implements Network, MethodHandler {
* @param version The version for which to register an adapter.
* @param adapter The adapter to register.
*/
- private void addSendAdapter(Version version, RPCSendAdapter adapter) {
+ private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) {
adapter.attach(this);
sendAdapters.put(version, adapter);
}
@@ -346,8 +328,12 @@ public class RPCNetwork implements Network, MethodHandler {
* @return The compatible adapter.
*/
private RPCSendAdapter getSendAdapter(Version version) {
- Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version);
- return (lower != null) ? lower.getValue() : null;
+ for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) {
+ if (entry.getKey().matches(version)) {
+ return entry.getValue();
+ }
+ }
+ return null;
}
/**
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
deleted file mode 100644
index d7b4887bd36..00000000000
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java
+++ /dev/null
@@ -1,269 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.component.Version;
-
-import com.yahoo.jrt.Method;
-import com.yahoo.jrt.MethodHandler;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.RequestWaiter;
-import com.yahoo.jrt.Values;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Protocol;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Routable;
-import com.yahoo.messagebus.Trace;
-import com.yahoo.messagebus.TraceLevel;
-import com.yahoo.messagebus.routing.Hop;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingNode;
-import com.yahoo.text.Utf8Array;
-
-/**
- * Implements the request adapter for method "mbus.send1/mbus.slime".
- *
- * @author baldersheim
- */
-public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
-
- private RPCNetwork net = null;
- private String clientIdent = "client";
- private String serverIdent = "server";
-
- protected abstract Method buildMethod();
- protected abstract String getReturnSpec();
- protected abstract Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
- long timeRemaining, byte[] payload, int traceLevel);
- protected abstract Reply createReply(Values ret, String serviceName, Trace trace);
- protected abstract Params toParams(Values req);
- protected abstract void createResponse(Values ret, Reply reply, Version version, byte [] payload);
- @Override
- public final void attach(RPCNetwork net) {
- this.net = net;
- String prefix = net.getIdentity().getServicePrefix();
- if (prefix != null && prefix.length() > 0) {
- clientIdent = "'" + prefix + "'";
- serverIdent = clientIdent;
- }
- net.getSupervisor().addMethod(buildMethod());
- }
-
- @Override
- public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
- SendContext ctx = new SendContext(recipient, timeRemaining);
- RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
- Message msg = recipient.getMessage();
- Route route = new Route(recipient.getRoute());
- Hop hop = route.removeHop(0);
-
- Request req = encodeRequest(version, route, address,msg, timeRemaining, payload, ctx.trace.getLevel());
-
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Sending message (version " + version + ") from " + clientIdent + " to '" +
- address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getJRTTarget().invokeVoid(req);
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Not waiting for a reply from '" + address.getServiceName() + "'.");
- }
- Reply reply = new EmptyReply();
- reply.getTrace().swap(ctx.trace);
- net.getOwner().deliverReply(reply, recipient);
- } else {
- req.setContext(ctx);
- address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
- }
- req.discardParameters(); // allow garbage collection of request parameters
- }
-
- protected final Object decode(Utf8Array protocolName, Version version, byte [] payload) {
- Protocol protocol = net.getOwner().getProtocol(protocolName);
- if (protocol != null) {
- Routable routable = protocol.decode(version, payload);
- if (routable != null) {
- if (routable instanceof Reply) {
- return routable;
- } else {
- return new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message.");
- }
- } else {
- return new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable.");
- }
- } else {
- return new Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
- }
- }
-
- @Override
- public final void handleRequestDone(Request req) {
- SendContext ctx = (SendContext)req.getContext();
- String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
- Reply reply = null;
- Error error = null;
- if (!req.checkReturnTypes(getReturnSpec())) {
- // Map all known JRT errors to the appropriate message bus error.
- reply = new EmptyReply();
- switch (req.errorCode()) {
- case com.yahoo.jrt.ErrorCode.TIMEOUT:
- error = new Error(ErrorCode.TIMEOUT,
- "A timeout occured while waiting for '" + serviceName + "' (" +
- ctx.timeout + " seconds expired); " + req.errorMessage());
- break;
- case com.yahoo.jrt.ErrorCode.CONNECTION:
- error = new Error(ErrorCode.CONNECTION_ERROR,
- "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
- break;
- default:
- error = new Error(ErrorCode.NETWORK_ERROR,
- "A network error occured for '" + serviceName + "'; " + req.errorMessage());
- }
- } else {
- reply = createReply(req.returnValues(), serviceName, ctx.trace);
- }
- if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
- ctx.trace.trace(TraceLevel.SEND_RECEIVE,
- "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
- }
- reply.getTrace().swap(ctx.trace);
- if (error != null) {
- reply.addError(error);
- }
- net.getOwner().deliverReply(reply, ctx.recipient);
- }
-
- protected final class Params {
- Version version;
- String route;
- String session;
- boolean retryEnabled;
- int retry;
- long timeRemaining;
- Utf8Array protocolName;
- byte [] payload;
- int traceLevel;
- }
-
- @Override
- public final void invoke(Request request) {
- request.detach();
- Params p = toParams(request.parameters());
-
- request.discardParameters(); // allow garbage collection of request parameters
-
- // Make sure that the owner understands the protocol.
- Protocol protocol = net.getOwner().getProtocol(p.protocolName);
- if (protocol == null) {
- replyError(request, p.version, p.traceLevel,
- new Error(ErrorCode.UNKNOWN_PROTOCOL,
- "Protocol '" + p.protocolName + "' is not known by " + serverIdent + "."));
- return;
- }
- Routable routable = protocol.decode(p.version, p.payload);
- if (routable == null) {
- replyError(request, p.version, p.traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Protocol '" + protocol.getName() + "' failed to decode routable."));
- return;
- }
- if (routable instanceof Reply) {
- replyError(request, p.version, p.traceLevel,
- new Error(ErrorCode.DECODE_ERROR,
- "Payload decoded to a reply when expecting a message."));
- return;
- }
- Message msg = (Message)routable;
- if (p.route != null && p.route.length() > 0) {
- msg.setRoute(net.getRoute(p.route));
- }
- msg.setContext(new ReplyContext(request, p.version));
- msg.pushHandler(this);
- msg.setRetryEnabled(p.retryEnabled);
- msg.setRetry(p.retry);
- msg.setTimeReceivedNow();
- msg.setTimeRemaining(p.timeRemaining);
- msg.getTrace().setLevel(p.traceLevel);
- if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + p.session + "'.");
- }
- net.getOwner().deliverMessage(msg, p.session);
- }
-
- @Override
- public final void handleReply(Reply reply) {
- ReplyContext ctx = (ReplyContext)reply.getContext();
- reply.setContext(null);
-
- // Add trace information.
- if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
- reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
- "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
- }
-
- // Encode and return the reply through the RPC request.
- byte[] payload = new byte[0];
- if (reply.getType() != 0) {
- Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
- if (protocol != null) {
- payload = protocol.encode(ctx.version, reply);
- }
- if (payload == null || payload.length == 0) {
- reply.addError(new Error(ErrorCode.ENCODE_ERROR,
- "An error occured while encoding the reply."));
- }
- }
- createResponse(ctx.request.returnValues(), reply, ctx.version, payload);
- ctx.request.returnRequest();
- }
-
- /**
- * Send an error reply for a given request.
- *
- * @param request The JRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- private void replyError(Request request, Version version, int traceLevel, Error err) {
- Reply reply = new EmptyReply();
- reply.setContext(new ReplyContext(request, version));
- reply.getTrace().setLevel(traceLevel);
- reply.addError(err);
- handleReply(reply);
- }
-
- private static class SendContext {
-
- final RoutingNode recipient;
- final Trace trace;
- final double timeout;
-
- SendContext(RoutingNode recipient, long timeRemaining) {
- this.recipient = recipient;
- trace = new Trace(recipient.getTrace().getLevel());
- timeout = timeRemaining * 0.001;
- }
- }
-
- private static class ReplyContext {
-
- final Request request;
- final Version version;
-
- public ReplyContext(Request request, Version version) {
- this.request = request;
- this.version = version;
- }
- }
-}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
index 480a716e382..40cb7fb9ee9 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java
@@ -2,41 +2,40 @@
package com.yahoo.messagebus.network.rpc;
import com.yahoo.component.Version;
-import com.yahoo.jrt.DataValue;
-import com.yahoo.jrt.DoubleValue;
-import com.yahoo.jrt.Int32Array;
-import com.yahoo.jrt.Int32Value;
-import com.yahoo.jrt.Int64Value;
-import com.yahoo.jrt.Int8Value;
-import com.yahoo.jrt.Method;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.StringArray;
+import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
-import com.yahoo.jrt.Values;
-import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.*;
import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.Trace;
-import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
+import com.yahoo.text.Utf8String;
/**
* Implements the request adapter for method "mbus.send1".
*
* @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
*/
-public class RPCSendV1 extends RPCSend {
+public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter {
private final String METHOD_NAME = "mbus.send1";
private final String METHOD_PARAMS = "sssbilsxi";
private final String METHOD_RETURN = "sdISSsxs";
+ private RPCNetwork net = null;
+ private String clientIdent = "client";
+ private String serverIdent = "server";
@Override
- protected String getReturnSpec() { return METHOD_RETURN; }
- @Override
- protected Method buildMethod() {
+ public void attach(RPCNetwork net) {
+ this.net = net;
+ String prefix = net.getIdentity().getServicePrefix();
+ if (prefix != null && prefix.length() > 0) {
+ clientIdent = "'" + prefix + "'";
+ serverIdent = clientIdent;
+ }
Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
method.methodDesc("Send a message bus request and get a reply back.");
@@ -57,80 +56,206 @@ public class RPCSendV1 extends RPCSend {
.returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.")
.returnDesc(6, "payload", "The protocol specific reply payload.")
.returnDesc(7, "trace", "A string representation of the trace.");
- return method;
+ net.getSupervisor().addMethod(method);
}
+
@Override
- protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
- long timeRemaining, byte[] payload, int traceLevel) {
+ public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) {
+ SendContext ctx = new SendContext(recipient, timeRemaining);
+ RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
+ Message msg = recipient.getMessage();
+ Route route = new Route(recipient.getRoute());
+ Hop hop = route.removeHop(0);
+
Request req = new Request(METHOD_NAME);
- Values v = req.parameters();
- v.add(new StringValue(version.toString()));
- v.add(new StringValue(route.toString()));
- v.add(new StringValue(address.getSessionName()));
- v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
- v.add(new Int32Value(msg.getRetry()));
- v.add(new Int64Value(timeRemaining));
- v.add(new StringValue(msg.getProtocol()));
- v.add(new DataValue(payload));
- v.add(new Int32Value(traceLevel));
- return req;
+ req.parameters().add(new StringValue(version.toString()));
+ req.parameters().add(new StringValue(route.toString()));
+ req.parameters().add(new StringValue(address.getSessionName()));
+ req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0));
+ req.parameters().add(new Int32Value(msg.getRetry()));
+ req.parameters().add(new Int64Value(timeRemaining));
+ req.parameters().add(new StringValue(msg.getProtocol()));
+ req.parameters().add(new DataValue(payload));
+ req.parameters().add(new Int32Value(ctx.trace.getLevel()));
+
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Sending message (version " + version + ") from " + clientIdent + " to '" +
+ address.getServiceName() + "' with " + ctx.timeout + " seconds timeout.");
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getJRTTarget().invokeVoid(req);
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Not waiting for a reply from '" + address.getServiceName() + "'.");
+ }
+ Reply reply = new EmptyReply();
+ reply.getTrace().swap(ctx.trace);
+ net.getOwner().deliverReply(reply, recipient);
+ } else {
+ req.setContext(ctx);
+ address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this);
+ }
+ req.discardParameters(); // allow garbage collection of request parameters
}
@Override
- protected Reply createReply(Values ret, String serviceName, Trace trace) {
- Version version = new Version(ret.get(0).asUtf8Array());
- double retryDelay = ret.get(1).asDouble();
- int[] errorCodes = ret.get(2).asInt32Array();
- String[] errorMessages = ret.get(3).asStringArray();
- String[] errorServices = ret.get(4).asStringArray();
- Utf8Array protocolName = ret.get(5).asUtf8Array();
- byte[] payload = ret.get(6).asData();
- String replyTrace = ret.get(7).asString();
-
- // Make sure that the owner understands the protocol.
+ public void handleRequestDone(Request req) {
+ SendContext ctx = (SendContext)req.getContext();
+ String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName();
Reply reply = null;
Error error = null;
- if (payload.length > 0) {
- Object retval = decode(protocolName, version, payload);
- if (retval instanceof Reply) {
- reply = (Reply) retval;
- } else {
- error = (Error) retval;
+ if (!req.checkReturnTypes(METHOD_RETURN)) {
+ // Map all known JRT errors to the appropriate message bus error.
+ reply = new EmptyReply();
+ switch (req.errorCode()) {
+ case com.yahoo.jrt.ErrorCode.TIMEOUT:
+ error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT,
+ "A timeout occured while waiting for '" + serviceName + "' (" +
+ ctx.timeout + " seconds expired); " + req.errorMessage());
+ break;
+ case com.yahoo.jrt.ErrorCode.CONNECTION:
+ error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR,
+ "A connection error occured for '" + serviceName + "'; " + req.errorMessage());
+ break;
+ default:
+ error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR,
+ "A network error occured for '" + serviceName + "'; " + req.errorMessage());
+ }
+ } else {
+ // Retrieve all reply components from JRT request object.
+ Version version = new Version(req.returnValues().get(0).asUtf8Array());
+ double retryDelay = req.returnValues().get(1).asDouble();
+ int[] errorCodes = req.returnValues().get(2).asInt32Array();
+ String[] errorMessages = req.returnValues().get(3).asStringArray();
+ String[] errorServices = req.returnValues().get(4).asStringArray();
+ Utf8Array protocolName = req.returnValues().get(5).asUtf8Array();
+ byte[] payload = req.returnValues().get(6).asData();
+ String replyTrace = req.returnValues().get(7).asString();
+
+ // Make sure that the owner understands the protocol.
+ if (payload.length > 0) {
+ Protocol protocol = net.getOwner().getProtocol(protocolName);
+ if (protocol != null) {
+ Routable routable = protocol.decode(version, payload);
+ if (routable != null) {
+ if (routable instanceof Reply) {
+ reply = (Reply)routable;
+ } else {
+ error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message.");
+ }
+ } else {
+ error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable.");
+ }
+ } else {
+ error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + protocolName + "' is not known by " + serverIdent + ".");
+ }
+ }
+ if (reply == null) {
+ reply = new EmptyReply();
+ }
+ reply.setRetryDelay(retryDelay);
+ for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
+ reply.addError(new Error(errorCodes[i],
+ errorMessages[i],
+ errorServices[i].length() > 0 ? errorServices[i] : serviceName));
+ }
+ if (ctx.trace.getLevel() > 0) {
+ ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace));
}
}
- if (reply == null) {
- reply = new EmptyReply();
+ if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ ctx.trace.trace(TraceLevel.SEND_RECEIVE,
+ "Reply (type " + reply.getType() + ") received at " + clientIdent + ".");
}
+ reply.getTrace().swap(ctx.trace);
if (error != null) {
reply.addError(error);
}
- reply.setRetryDelay(retryDelay);
- for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) {
- reply.addError(new Error(errorCodes[i], errorMessages[i],
- errorServices[i].length() > 0 ? errorServices[i] : serviceName));
- }
- if (trace.getLevel() > 0) {
- trace.getRoot().addChild(TraceNode.decode(replyTrace));
- }
- return reply;
+ net.getOwner().deliverReply(reply, ctx.recipient);
}
- protected Params toParams(Values args) {
- Params p = new Params();
- p.version = new Version(args.get(0).asUtf8Array());
- p.route = args.get(1).asString();
- p.session = args.get(2).asString();
- p.retryEnabled = (args.get(3).asInt8() != 0);
- p.retry = args.get(4).asInt32();
- p.timeRemaining = args.get(5).asInt64();
- p.protocolName = args.get(6).asUtf8Array();
- p.payload = args.get(7).asData();
- p.traceLevel = args.get(8).asInt32();
- return p;
+ @Override
+ public void invoke(Request request) {
+ request.detach();
+ Version version = new Version(request.parameters().get(0).asUtf8Array());
+ String route = request.parameters().get(1).asString();
+ String session = request.parameters().get(2).asString();
+ boolean retryEnabled = (request.parameters().get(3).asInt8() != 0);
+ int retry = request.parameters().get(4).asInt32();
+ long timeRemaining = request.parameters().get(5).asInt64();
+ Utf8Array protocolName = request.parameters().get(6).asUtf8Array();
+ byte[] payload = request.parameters().get(7).asData();
+ int traceLevel = request.parameters().get(8).asInt32();
+
+ request.discardParameters(); // allow garbage collection of request parameters
+
+ // Make sure that the owner understands the protocol.
+ Protocol protocol = net.getOwner().getProtocol(protocolName);
+ if (protocol == null) {
+ replyError(request, version, traceLevel,
+ new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL,
+ "Protocol '" + protocolName + "' is not known by " + serverIdent + "."));
+ return;
+ }
+ Routable routable = protocol.decode(version, payload);
+ if (routable == null) {
+ replyError(request, version, traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Protocol '" + protocol.getName() + "' failed to decode routable."));
+ return;
+ }
+ if (routable instanceof Reply) {
+ replyError(request, version, traceLevel,
+ new Error(ErrorCode.DECODE_ERROR,
+ "Payload decoded to a reply when expecting a message."));
+ return;
+ }
+ Message msg = (Message)routable;
+ if (route != null && route.length() > 0) {
+ msg.setRoute(net.getRoute(route));
+ }
+ msg.setContext(new ReplyContext(request, version));
+ msg.pushHandler(this);
+ msg.setRetryEnabled(retryEnabled);
+ msg.setRetry(retry);
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(timeRemaining);
+ msg.getTrace().setLevel(traceLevel);
+ if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ msg.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'.");
+ }
+ net.getOwner().deliverMessage(msg, session);
}
@Override
- protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
+ public void handleReply(Reply reply) {
+ ReplyContext ctx = (ReplyContext)reply.getContext();
+ reply.setContext(null);
+
+ // Add trace information.
+ if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel.SEND_RECEIVE,
+ "Sending reply (version " + ctx.version + ") from " + serverIdent + ".");
+ }
+
+ // Encode and return the reply through the RPC request.
+ byte[] payload = new byte[0];
+ if (reply.getType() != 0) {
+ Protocol protocol = net.getOwner().getProtocol(reply.getProtocol());
+ if (protocol != null) {
+ payload = protocol.encode(ctx.version, reply);
+ }
+ if (payload == null || payload.length == 0) {
+ reply.addError(new Error(ErrorCode.ENCODE_ERROR,
+ "An error occured while encoding the reply."));
+ }
+ }
int[] eCodes = new int[reply.getNumErrors()];
String[] eMessages = new String[reply.getNumErrors()];
String[] eServices = new String[reply.getNumErrors()];
@@ -140,14 +265,57 @@ public class RPCSendV1 extends RPCSend {
eMessages[i] = error.getMessage();
eServices[i] = error.getService() != null ? error.getService() : "";
}
- ret.add(new StringValue(version.toString()));
- ret.add(new DoubleValue(reply.getRetryDelay()));
- ret.add(new Int32Array(eCodes));
- ret.add(new StringArray(eMessages));
- ret.add(new StringArray(eServices));
- ret.add(new StringValue(reply.getProtocol()));
- ret.add(new DataValue(payload));
- ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : ""));
+ ctx.request.returnValues().add(new StringValue(ctx.version.toString()));
+ ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay()));
+ ctx.request.returnValues().add(new Int32Array(eCodes));
+ ctx.request.returnValues().add(new StringArray(eMessages));
+ ctx.request.returnValues().add(new StringArray(eServices));
+ ctx.request.returnValues().add(new StringValue(reply.getProtocol()));
+ ctx.request.returnValues().add(new DataValue(payload));
+ ctx.request.returnValues().add(new StringValue(
+ reply.getTrace().getRoot() != null ?
+ reply.getTrace().getRoot().encode() :
+ ""));
+ ctx.request.returnRequest();
}
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The JRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ private void replyError(Request request, Version version, int traceLevel, Error err) {
+ Reply reply = new EmptyReply();
+ reply.setContext(new ReplyContext(request, version));
+ reply.getTrace().setLevel(traceLevel);
+ reply.addError(err);
+ handleReply(reply);
+ }
+
+ private static class SendContext {
+
+ final RoutingNode recipient;
+ final Trace trace;
+ final double timeout;
+
+ SendContext(RoutingNode recipient, long timeRemaining) {
+ this.recipient = recipient;
+ trace = new Trace(recipient.getTrace().getLevel());
+ timeout = timeRemaining * 0.001;
+ }
+ }
+
+ private static class ReplyContext {
+
+ final Request request;
+ final Version version;
+
+ public ReplyContext(Request request, Version version) {
+ this.request = request;
+ this.version = version;
+ }
+ }
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
deleted file mode 100644
index 8cc0b73ae30..00000000000
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java
+++ /dev/null
@@ -1,209 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.messagebus.network.rpc;
-
-import com.yahoo.component.Version;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.jrt.DataValue;
-import com.yahoo.jrt.Int32Value;
-import com.yahoo.jrt.Int8Value;
-import com.yahoo.jrt.Method;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.Values;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.Error;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.Trace;
-import com.yahoo.messagebus.TraceNode;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.slime.BinaryFormat;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Inspector;
-import com.yahoo.slime.Slime;
-import com.yahoo.text.Utf8;
-import com.yahoo.text.Utf8Array;
-
-/**
- * Implements the request adapter for method "mbus.slime".
- *
- * @author baldersheim
- */
-public class RPCSendV2 extends RPCSend {
-
- private final static String METHOD_NAME = "mbus.slime";
- private final static String METHOD_PARAMS = "bixbix";
- private final static String METHOD_RETURN = "bixbix";
- private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024);
-
- @Override
- protected String getReturnSpec() { return METHOD_RETURN; }
- @Override
- protected Method buildMethod() {
-
- Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this);
- method.methodDesc("Send a message bus request and get a reply back.");
- method.paramDesc(0, "header_encoding", "Encoding type of header.")
- .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.")
- .paramDesc(2, "header_payload", "Slime encoded header payload.")
- .paramDesc(3, "body_encoding", "Encoding type of body.")
- .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.")
- .paramDesc(5, "body_payload", "Slime encoded body payload.");
- method.returnDesc(0, "header_encoding", "Encoding type of header.")
- .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.")
- .returnDesc(2, "header_payload", "Slime encoded header payload.")
- .returnDesc(3, "body_encoding", "Encoding type of body.")
- .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.")
- .returnDesc(5, "body_payload", "Slime encoded body payload.");
- return method;
- }
- private static final String VERSION_F = new String("version");
- private static final String ROUTE_F = new String("route");
- private static final String SESSION_F = new String("session");
- private static final String PROTOCOL_F = new String("prot");
- private static final String TRACELEVEL_F = new String("tracelevel");
- private static final String TRACE_F = new String("trace");
- private static final String USERETRY_F = new String("useretry");
- private static final String RETRY_F = new String("retry");
- private static final String RETRYDELAY_F = new String("retrydelay");
- private static final String TIMEREMAINING_F = new String("timeleft");
- private static final String ERRORS_F = new String("errors");
- private static final String SERVICE_F = new String("service");
- private static final String CODE_F = new String("code");
- private static final String BLOB_F = new String("msg");
- private static final String MSG_F = new String("msg");
-
- @Override
- protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg,
- long timeRemaining, byte[] payload, int traceLevel)
- {
-
- Request req = new Request(METHOD_NAME);
- Values v = req.parameters();
-
- v.add(new Int8Value(CompressionType.NONE.getCode()));
- v.add(new Int32Value(0));
- v.add(new DataValue(new byte[0]));
-
- Slime slime = new Slime();
- Cursor root = slime.setObject();
-
- root.setString(VERSION_F, version.toString());
- root.setString(ROUTE_F, route.toString());
- root.setString(SESSION_F, address.getSessionName());
- root.setString(PROTOCOL_F, msg.getProtocol().toString());
- root.setBool(USERETRY_F, msg.getRetryEnabled());
- root.setLong(RETRY_F, msg.getRetry());
- root.setLong(TIMEREMAINING_F, msg.getTimeRemaining());
- root.setLong(TRACELEVEL_F, traceLevel);
- root.setData(BLOB_F, payload);
-
- byte[] serializedSlime = BinaryFormat.encode(slime);
- Compressor.Compression compressionResult = compressor.compress(serializedSlime);
-
- v.add(new Int8Value(compressionResult.type().getCode()));
- v.add(new Int32Value(compressionResult.uncompressedSize()));
- v.add(new DataValue(compressionResult.data()));
-
- return req;
- }
-
- @Override
- protected Reply createReply(Values ret, String serviceName, Trace trace) {
- CompressionType compression = CompressionType.valueOf(ret.get(3).asInt8());
- byte[] slimeBytes = compressor.decompress(ret.get(5).asData(), compression, ret.get(4).asInt32());
- Slime slime = BinaryFormat.decode(slimeBytes);
- Inspector root = slime.get();
-
- Version version = new Version(root.field(VERSION_F).asString());
- byte[] payload = root.field(BLOB_F).asData();
-
- // Make sure that the owner understands the protocol.
- Reply reply = null;
- Error error = null;
- if (payload.length > 0) {
- Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload);
- if (retval instanceof Reply) {
- reply = (Reply) retval;
- } else {
- error = (Error) retval;
- }
- }
- if (reply == null) {
- reply = new EmptyReply();
- }
- if (error != null) {
- reply.addError(error);
- }
- reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble());
-
- Inspector errors = root.field(ERRORS_F);
- for (int i = 0; i < errors.entries(); i++) {
- Inspector e = errors.entry(i);
- String service = e.field(SERVICE_F).asString();
- reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(),
- (service != null && service.length() > 0) ? service : serviceName));
- }
- if (trace.getLevel() > 0) {
- trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString()));
- }
- return reply;
- }
-
- protected Params toParams(Values args) {
- CompressionType compression = CompressionType.valueOf(args.get(3).asInt8());
- byte[] slimeBytes = compressor.decompress(args.get(5).asData(), compression, args.get(4).asInt32());
- Slime slime = BinaryFormat.decode(slimeBytes);
- Inspector root = slime.get();
- Params p = new Params();
- p.version = new Version(root.field(VERSION_F).asString());
- p.route = root.field(ROUTE_F).asString();
- p.session = root.field(SESSION_F).asString();
- p.retryEnabled = root.field(USERETRY_F).asBool();
- p.retry = (int)root.field(RETRY_F).asLong();
- p.timeRemaining = root.field(TIMEREMAINING_F).asLong();
- p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString()));
- p.payload = root.field(BLOB_F).asData();
- p.traceLevel = (int)root.field(TRACELEVEL_F).asLong();
- return p;
- }
-
- @Override
- protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) {
- ret.add(new Int8Value(CompressionType.NONE.getCode()));
- ret.add(new Int32Value(0));
- ret.add(new DataValue(new byte[0]));
-
- Slime slime = new Slime();
- Cursor root = slime.setObject();
-
- root.setString(VERSION_F, version.toString());
- root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
- root.setString(PROTOCOL_F, reply.getProtocol().toString());
- root.setData(BLOB_F, payload);
- if (reply.getTrace().getLevel() > 0) {
- root.setString(TRACE_F, reply.getTrace().getRoot().encode());
- }
-
- if (reply.getNumErrors() > 0) {
- Cursor array = root.setArray(ERRORS_F);
- for (int i = 0; i < reply.getNumErrors(); i++) {
- Cursor e = array.addObject();
- Error mbusE = reply.getError(i);
- e.setLong(CODE_F, mbusE.getCode());
- e.setString(MSG_F, mbusE.getMessage());
- if (mbusE.getService() != null) {
- e.setString(SERVICE_F, mbusE.getService());
- }
- }
- }
-
- byte[] serializedSlime = BinaryFormat.encode(slime);
- Compressor.Compression compressionResult = compressor.compress(serializedSlime);
-
- ret.add(new Int8Value(compressionResult.type().getCode()));
- ret.add(new Int32Value(compressionResult.uncompressedSize()));
- ret.add(new DataValue(compressionResult.data()));
- }
-
-}
diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
index 8d6b8cd3875..e847fdf3222 100644
--- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp
+++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp
@@ -1,15 +1,17 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/routing/errordirective.h>
+#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/custompolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
using namespace mbus;
@@ -115,7 +117,7 @@ Test::Main()
void
Test::testAdvanced(TestData &data)
{
- const double TIMEOUT = 60;
+ const double TIMEOUT=60;
IProtocol::SP protocol(new SimpleProtocol());
SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol);
simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE)));
diff --git a/messagebus/src/tests/bucketsequence/bucketsequence.cpp b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
index d98acc4f191..4073469c253 100644
--- a/messagebus/src/tests/bucketsequence/bucketsequence.cpp
+++ b/messagebus/src/tests/bucketsequence/bucketsequence.cpp
@@ -5,6 +5,7 @@
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/testlib/simplereply.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp
index 91f91c84f45..b8e20eb9074 100644
--- a/messagebus/src/tests/choke/choke.cpp
+++ b/messagebus/src/tests/choke/choke.cpp
@@ -2,6 +2,7 @@
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/reply.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp
index 1d8a489a5ed..dbaa869507b 100644
--- a/messagebus/src/tests/error/error.cpp
+++ b/messagebus/src/tests/error/error.cpp
@@ -65,14 +65,14 @@ Test::Main()
reply = pxy.getReply();
ASSERT_TRUE(reply.get() != 0);
- ASSERT_EQUAL(reply->getNumErrors(), 1u);
+ EXPECT_EQUAL(reply->getNumErrors(), 1u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality"));
is->forward(std::move(reply));
reply = src.getReply();
ASSERT_TRUE(reply.get() != 0);
- ASSERT_EQUAL(reply->getNumErrors(), 2u);
+ EXPECT_EQUAL(reply->getNumErrors(), 2u);
EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session");
EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session");
}
diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp
index 515cbd99fde..e907e0d1163 100644
--- a/messagebus/src/tests/routing/routing.cpp
+++ b/messagebus/src/tests/routing/routing.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/routing/routingcontext.h>
diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp
index e3fa3278300..8ae1d0993cb 100644
--- a/messagebus/src/tests/sendadapter/sendadapter.cpp
+++ b/messagebus/src/tests/sendadapter/sendadapter.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
#include <vespa/messagebus/testlib/simpleprotocol.h>
diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
index 89edb981716..15a2e9fdac8 100644
--- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp
+++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp
@@ -3,7 +3,20 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/messagebus/testlib/testserver.h>
+#include <vespa/messagebus/testlib/receptor.h>
+#include <vespa/messagebus/testlib/simplemessage.h>
+#include <vespa/messagebus/testlib/simplereply.h>
+#include <vespa/messagebus/testlib/simpleprotocol.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/sourcesession.h>
+#include <vespa/messagebus/intermediatesession.h>
+#include <vespa/messagebus/destinationsession.h>
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/error.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/routing/routingspec.h>
#include <vespa/messagebus/network/rpcservice.h>
+#include <vespa/messagebus/sourcesessionparams.h>
using namespace mbus;
diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp
index 86a800b600a..c98b342b2f3 100644
--- a/messagebus/src/tests/servicepool/servicepool.cpp
+++ b/messagebus/src/tests/servicepool/servicepool.cpp
@@ -1,8 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/network/rpcnetwork.h>
-#include <vespa/messagebus/network/rpcnetworkparams.h>
-#include <vespa/messagebus/network/rpcservicepool.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vespalib/testkit/testapp.h>
diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp
index 070b51bbbc2..b5e6cac970e 100644
--- a/messagebus/src/tests/shutdown/shutdown.cpp
+++ b/messagebus/src/tests/shutdown/shutdown.cpp
@@ -1,6 +1,9 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
+#include <vespa/messagebus/messagebus.h>
+#include <vespa/messagebus/routing/errordirective.h>
#include <vespa/messagebus/routing/retrytransienterrorspolicy.h>
#include <vespa/messagebus/testlib/receptor.h>
#include <vespa/messagebus/testlib/simplemessage.h>
diff --git a/messagebus/src/tests/slobrok/slobrok.cpp b/messagebus/src/tests/slobrok/slobrok.cpp
index 6c389c25e70..360705e7eae 100644
--- a/messagebus/src/tests/slobrok/slobrok.cpp
+++ b/messagebus/src/tests/slobrok/slobrok.cpp
@@ -2,10 +2,10 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/messagebus/testlib/slobrok.h>
+#include <string>
+#include <sstream>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/messagebus/network/rpcnetwork.h>
-#include <vespa/messagebus/network/rpcnetworkparams.h>
-
#include <vespa/vespalib/util/host_name.h>
using slobrok::api::IMirrorAPI;
diff --git a/messagebus/src/vespa/messagebus/blob.h b/messagebus/src/vespa/messagebus/blob.h
index 0509d71d77c..1e84222064a 100644
--- a/messagebus/src/vespa/messagebus/blob.h
+++ b/messagebus/src/vespa/messagebus/blob.h
@@ -26,13 +26,13 @@ public:
_payload(Alloc::alloc(s)),
_sz(s)
{ }
- Blob(Blob && rhs) noexcept :
+ Blob(Blob && rhs) :
_payload(std::move(rhs._payload)),
_sz(rhs._sz)
{
rhs._sz = 0;
}
- Blob & operator = (Blob && rhs) noexcept {
+ Blob & operator = (Blob && rhs) {
swap(rhs);
return *this;
}
@@ -65,3 +65,4 @@ private:
};
} // namespace mbus
+
diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp
index b7179e14cad..0ab8658d53f 100644
--- a/messagebus/src/vespa/messagebus/callstack.cpp
+++ b/messagebus/src/vespa/messagebus/callstack.cpp
@@ -13,7 +13,7 @@ CallStack::discard()
{
while (!_stack.empty()) {
const Frame &frame = _stack.back();
- if (frame.discardHandler != nullptr) {
+ if (frame.discardHandler != NULL) {
frame.discardHandler->handleDiscard(frame.ctx);
}
_stack.pop_back();
diff --git a/messagebus/src/vespa/messagebus/callstack.h b/messagebus/src/vespa/messagebus/callstack.h
index 0f9d8c93b29..68da598e796 100644
--- a/messagebus/src/vespa/messagebus/callstack.h
+++ b/messagebus/src/vespa/messagebus/callstack.h
@@ -72,7 +72,7 @@ public:
* @param ctx The context to store.
* @param discardHandler The handler for discarded messages.
**/
- void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = nullptr) {
+ void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = NULL) {
_stack.emplace_back(&replyHandler, discardHandler, ctx);
}
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
index ecbc036ffed..3959666e718 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
@@ -6,7 +6,7 @@ namespace mbus {
DestinationSessionParams::DestinationSessionParams() :
_name("destination"),
_broadcastName(true),
- _handler(nullptr)
+ _handler(NULL)
{ }
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index 5a8f510ddcf..4cd19b62419 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -97,7 +97,7 @@ MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) :
MessageBusParams params;
while (!protocols.empty()) {
IProtocol::SP protocol = protocols.extract();
- if (protocol.get() != nullptr) {
+ if (protocol.get() != NULL) {
params.addProtocol(protocol);
}
}
@@ -155,7 +155,7 @@ MessageBus::setup(const MessageBusParams &params)
// Start messenger.
IRetryPolicy::SP retryPolicy = params.getRetryPolicy();
- if (retryPolicy.get() != nullptr) {
+ if (retryPolicy.get() != NULL) {
_resender.reset(new Resender(retryPolicy));
Messenger::ITask::UP task(new ResenderTask(*_resender));
@@ -271,7 +271,7 @@ MessageBus::sync()
void
MessageBus::handleMessage(Message::UP msg)
{
- if (_resender.get() != nullptr && msg->hasBucketSequence()) {
+ if (_resender.get() != NULL && msg->hasBucketSequence()) {
deliverError(std::move(msg), ErrorCode::SEQUENCE_ERROR,
"Bucket sequences not supported when resender is enabled.");
return;
@@ -359,7 +359,7 @@ MessageBus::handleDiscard(Context ctx)
void
MessageBus::deliverMessage(Message::UP msg, const string &session)
{
- IMessageHandler *msgHandler = nullptr;
+ IMessageHandler *msgHandler = NULL;
{
LockGuard guard(_lock);
std::map<string, IMessageHandler*>::iterator it = _sessions.find(session);
@@ -367,7 +367,7 @@ MessageBus::deliverMessage(Message::UP msg, const string &session)
msgHandler = it->second;
}
}
- if (msgHandler == nullptr) {
+ if (msgHandler == NULL) {
deliverError(std::move(msg), ErrorCode::UNKNOWN_SESSION,
make_string("Session '%s' does not exist.", session.c_str()));
} else if (!checkPending(*msg)) {
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 4b612b66c31..2d1204ee7b6 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -32,7 +32,7 @@ public:
}
~MessageTask() {
- if (_msg.get() != nullptr) {
+ if (_msg.get() != NULL) {
_msg->discard();
}
}
@@ -42,7 +42,7 @@ public:
}
uint8_t priority() const override {
- if (_msg.get() != nullptr) {
+ if (_msg.get() != NULL) {
return _msg->priority();
}
@@ -64,7 +64,7 @@ public:
}
~ReplyTask() {
- if (_reply.get() != nullptr) {
+ if (_reply.get() != NULL) {
_reply->discard();
}
}
@@ -74,7 +74,7 @@ public:
}
uint8_t priority() const override {
- if (_reply.get() != nullptr) {
+ if (_reply.get() != NULL) {
return _reply->priority();
}
@@ -205,7 +205,7 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg)
_queue.pop();
}
}
- if (task.get() != nullptr) {
+ if (task.get() != NULL) {
try {
task->run();
} catch (const std::exception &e) {
diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
index 750ff20240f..e8992034622 100644
--- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt
+++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt
@@ -6,9 +6,7 @@ vespa_add_library(messagebus_network OBJECT
oosmanager.cpp
rpcnetwork.cpp
rpcnetworkparams.cpp
- rpcsend.cpp
rpcsendv1.cpp
- rpcsendv2.cpp
rpcservice.cpp
rpcserviceaddress.cpp
rpcservicepool.cpp
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.cpp b/messagebus/src/vespa/messagebus/network/oosmanager.cpp
index 250df147675..eecfe1da447 100644
--- a/messagebus/src/vespa/messagebus/network/oosmanager.cpp
+++ b/messagebus/src/vespa/messagebus/network/oosmanager.cpp
@@ -89,7 +89,7 @@ OOSManager::isOOS(const string &service)
return false;
}
vespalib::LockGuard guard(_lock);
- if (_oosSet.get() == nullptr) {
+ if (_oosSet.get() == NULL) {
return false;
}
if (_oosSet->find(service) == _oosSet->end()) {
diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h
index eac00b93896..b521520d2b3 100644
--- a/messagebus/src/vespa/messagebus/network/oosmanager.h
+++ b/messagebus/src/vespa/messagebus/network/oosmanager.h
@@ -20,11 +20,14 @@ class RPCNetwork;
*/
class OOSManager : public FNET_Task {
public:
- using IMirrorAPI = slobrok::api::IMirrorAPI;
- using SpecList = IMirrorAPI::SpecList;
- using ClientList = std::vector<OOSClient::SP>;
- using StringSet = std::set<string>;
- using OOSSet = std::shared_ptr<StringSet>;
+ /**
+ * Convenience typedefs.
+ */
+ typedef slobrok::api::IMirrorAPI IMirrorAPI;
+ typedef IMirrorAPI::SpecList SpecList;
+ typedef std::vector<OOSClient::SP> ClientList;
+ typedef std::set<string> StringSet;
+ typedef std::shared_ptr<StringSet> OOSSet;
private:
FRT_Supervisor &_orb;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 4b4b23a75db..75c5fc3f6c5 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -1,11 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "inetworkowner.h"
#include "rpcnetwork.h"
-#include "rpcservicepool.h"
-#include "oosmanager.h"
-#include "rpcsendv1.h"
-#include "rpcsendv2.h"
-#include "rpctargetpool.h"
-#include "rpcnetworkparams.h"
#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/iprotocol.h>
#include <vespa/messagebus/tracelevel.h>
@@ -16,8 +11,6 @@
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/scheduler.h>
-#include <vespa/fnet/transport.h>
-#include <vespa/fnet/frt/supervisor.h>
#include <vespa/log/log.h>
LOG_SETUP(".rpcnetwork");
@@ -59,15 +52,17 @@ public:
namespace mbus {
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
- const std::vector<RoutingNode*> &recipients)
- : _net(net),
- _msg(msg),
- _traceLevel(msg.getTrace().getLevel()),
- _recipients(recipients),
- _hasError(false),
- _pending(_recipients.size()),
- _version(_net.getVersion())
-{ }
+ const std::vector<RoutingNode*> &recipients) :
+ _net(net),
+ _msg(msg),
+ _traceLevel(msg.getTrace().getLevel()),
+ _recipients(recipients),
+ _hasError(false),
+ _pending(_recipients.size()),
+ _version(_net.getVersion())
+{
+ // empty
+}
void
RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
@@ -75,7 +70,7 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
bool shouldSend = false;
{
vespalib::LockGuard guard(_lock);
- if (version == nullptr) {
+ if (version == NULL) {
_hasError = true;
} else if (*version < _version) {
_version = *version;
@@ -90,9 +85,11 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
-RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool)
- : FNET_Task(&scheduler),
- _pool(pool)
+RPCNetwork::TargetPoolTask::TargetPoolTask(
+ FNET_Scheduler &scheduler,
+ RPCTargetPool &pool) :
+ FNET_Task(&scheduler),
+ _pool(pool)
{
ScheduleNow();
}
@@ -107,26 +104,24 @@ RPCNetwork::TargetPoolTask::PerformTask()
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(0),
_ident(params.getIdentity()),
- _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
- _orb(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)),
- _scheduler(*_transport->GetScheduler()),
- _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
- _targetPoolTask(_scheduler, *_targetPool),
- _servicePool(std::make_unique<RPCServicePool>(*this, 4096)),
- _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
- _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
- _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
- _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())),
+ _threadPool(128000, 0),
+ _transport(),
+ _orb(&_transport, NULL),
+ _scheduler(*_transport.GetScheduler()),
+ _targetPool(params.getConnectionExpireSecs()),
+ _targetPoolTask(_scheduler, _targetPool),
+ _servicePool(*this, 4096),
+ _slobrokCfgFactory(params.getSlobrokConfig()),
+ _mirror(std::make_unique<slobrok::api::MirrorAPI>(_orb, _slobrokCfgFactory)),
+ _regAPI(std::make_unique<slobrok::api::RegisterAPI>(_orb, _slobrokCfgFactory)),
+ _oosManager(_orb, *_mirror, params.getOOSServerPattern()),
_requestedPort(params.getListenPort()),
- _sendV1(std::make_unique<RPCSendV1>()),
- _sendV2(std::make_unique<RPCSendV2>()),
- _sendAdapters(),
- _compressionConfig(params.getCompressionConfig())
+ _sendV1(),
+ _sendAdapters()
{
- _transport->SetDirectWrite(false);
- _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
- _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
+ _transport.SetDirectWrite(false);
+ _transport.SetMaxInputBufferSize(params.getMaxInputBufferSize());
+ _transport.SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
}
RPCNetwork::~RPCNetwork()
@@ -137,33 +132,33 @@ RPCNetwork::~RPCNetwork()
FRT_RPCRequest *
RPCNetwork::allocRequest()
{
- return _orb->AllocRPCRequest();
+ return _orb.AllocRPCRequest();
}
RPCTarget::SP
RPCNetwork::getTarget(const RPCServiceAddress &address)
{
- return _targetPool->getTarget(*_orb, address);
+ return _targetPool.getTarget(_orb, address);
}
void
-RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg)
+RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode,
+ const string &errMsg)
{
- for (RoutingNode * rnode : ctx._recipients) {
+ for (std::vector<RoutingNode*>::const_iterator it = ctx._recipients.begin();
+ it != ctx._recipients.end(); ++it)
+ {
Reply::UP reply(new EmptyReply());
reply->setTrace(Trace(ctx._traceLevel));
reply->addError(Error(errCode, errMsg));
- _owner->deliverReply(std::move(reply), *rnode);
+ _owner->deliverReply(std::move(reply), **it);
}
}
-int RPCNetwork::getPort() const { return _orb->GetListenPort(); }
-
-
void
RPCNetwork::flushTargetPool()
{
- _targetPool->flushTargets(true);
+ _targetPool.flushTargets(true);
}
const vespalib::Version &
@@ -178,13 +173,13 @@ RPCNetwork::attach(INetworkOwner &owner)
LOG_ASSERT(_owner == 0);
_owner = &owner;
- _sendV1->attach(*this);
- _sendV2->attach(*this);
- _sendAdapters[vespalib::Version(5)] = _sendV1.get();
- _sendAdapters[vespalib::Version(6, 142)] = _sendV2.get();
+ _sendV1.attach(*this);
+ _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(5), &_sendV1));
+ _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(6), &_sendV1));
- FRT_ReflectionBuilder builder(_orb.get());
- builder.DefineMethod("mbus.getVersion", "", "s", true, FRT_METHOD(RPCNetwork::invoke), this);
+ FRT_ReflectionBuilder builder(&_orb);
+ builder.DefineMethod("mbus.getVersion", "", "s", true,
+ FRT_METHOD(RPCNetwork::invoke), this);
builder.MethodDesc("Retrieves the message bus version.");
builder.ReturnDesc("version", "The message bus version.");
}
@@ -198,23 +193,29 @@ RPCNetwork::invoke(FRT_RPCRequest *req)
const string
RPCNetwork::getConnectionSpec() const
{
- return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb->GetListenPort());
+ return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb.GetListenPort());
}
RPCSendAdapter *
RPCNetwork::getSendAdapter(const vespalib::Version &version)
{
- auto lower = _sendAdapters.lower_bound(version);
- return (lower != _sendAdapters.end()) ? lower->second : nullptr;
+ for (SendAdapterMap::iterator it = _sendAdapters.begin();
+ it != _sendAdapters.end(); ++it)
+ {
+ if (it->first.matches(version)) {
+ return it->second;
+ }
+ }
+ return NULL;
}
bool
RPCNetwork::start()
{
- if (!_orb->Listen(_requestedPort)) {
+ if (!_orb.Listen(_requestedPort)) {
return false;
}
- if (!_transport->Start(_threadPool.get())) {
+ if (!_transport.Start(&_threadPool)) {
return false;
}
return true;
@@ -226,13 +227,13 @@ bool
RPCNetwork::waitUntilReady(double seconds) const
{
slobrok::api::SlobrokList brokerList;
- slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList);
+ slobrok::Configurator::UP configurator = _slobrokCfgFactory.create(brokerList);
bool hasConfig = false;
for (uint32_t i = 0; i < seconds * 100; ++i) {
if (configurator->poll()) {
hasConfig = true;
}
- if (_mirror->ready() && _oosManager->isReady()) {
+ if (_mirror->ready() && _oosManager.isReady()) {
return true;
}
FastOS_Thread::Sleep(10);
@@ -243,7 +244,7 @@ RPCNetwork::waitUntilReady(double seconds) const
std::string brokers = brokerList.logString();
LOG(error, "mirror (of %s) failed to become ready in %d seconds",
brokers.c_str(), (int)seconds);
- } else if (! _oosManager->isReady()) {
+ } else if (! _oosManager.isReady()) {
LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds);
}
return false;
@@ -292,23 +293,24 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient)
Error
RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName)
{
- if (_oosManager->isOOS(serviceName)) {
+ if (_oosManager.isOOS(serviceName)) {
return Error(ErrorCode::SERVICE_OOS,
make_string("The service '%s' has been marked as out of service.",
serviceName.c_str()));
}
- RPCServiceAddress::UP ret = _servicePool->resolve(serviceName);
- if (ret.get() == nullptr) {
+ RPCServiceAddress::UP ret = _servicePool.resolve(serviceName);
+ if (ret.get() == NULL) {
return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE,
make_string("The address of service '%s' could not be resolved. It is not currently "
- "registered with the Vespa name server. "
- "The service must be having problems, or the routing configuration is wrong.",
- serviceName.c_str()));
+ "registered with the Vespa name server. "
+ "The service must be having problems, or the routing configuration is wrong.",
+ serviceName.c_str()));
}
- RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret);
- if (target.get() == nullptr) {
+ RPCTarget::SP target = _targetPool.getTarget(_orb, *ret);
+ if (target.get() == NULL) {
return Error(ErrorCode::CONNECTION_ERROR,
- make_string("Failed to connect to service '%s'.", serviceName.c_str()));
+ make_string("Failed to connect to service '%s'.",
+ serviceName.c_str()));
}
ret->setTarget(target); // free by freeServiceAddress()
recipient.setServiceAddress(IServiceAddress::UP(ret.release()));
@@ -328,7 +330,7 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients
double timeout = ctx._msg.getTimeRemainingNow() / 1000.0;
for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) {
RoutingNode *&recipient = ctx._recipients[i];
- LOG_ASSERT(recipient != nullptr);
+ LOG_ASSERT(recipient != NULL);
RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient->getServiceAddress());
LOG_ASSERT(address.hasTarget());
@@ -341,12 +343,13 @@ void
RPCNetwork::send(RPCNetwork::SendContext &ctx)
{
if (ctx._hasError) {
- replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version.");
+ replyError(ctx, ErrorCode::HANDSHAKE_FAILED,
+ "An error occured while resolving version.");
} else {
uint64_t timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
RPCSendAdapter *adapter = getSendAdapter(ctx._version);
- if (adapter == nullptr) {
+ if (adapter == NULL) {
replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION,
make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str()));
} else if (timeRemaining == 0) {
@@ -375,8 +378,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
- _transport->ShutDown(false);
- _threadPool->Close();
+ _transport.ShutDown(false);
+ _threadPool.Close();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 0cda6ffecb7..856f9d0ef64 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -2,33 +2,27 @@
#pragma once
#include "inetwork.h"
-#include "rpcsendadapter.h"
-#include "rpctarget.h"
-#include "identity.h"
+#include "oosmanager.h"
+#include "rpcnetworkparams.h"
+#include "rpcsendv1.h"
+#include "rpcservicepool.h"
+#include "rpctargetpool.h"
#include <vespa/messagebus/blob.h>
#include <vespa/messagebus/blobref.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/reply.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/vespalib/component/versionspecification.h>
-#include <vespa/vespalib/util/compressionconfig.h>
-#include <vespa/fnet/frt/invokable.h>
-
-class FNET_Transport;
+#include <vespa/fnet/transport.h>
+#include <vespa/fnet/frt/supervisor.h>
namespace slobrok {
- namespace api { class RegisterAPI; }
- class ConfiguratorFactory;
+ namespace api {
+ class RegisterAPI;
+ }
}
namespace mbus {
-
-class OOSManager;
-class RPCServicePool;
-class RPCTargetPool;
-class RPCNetworkParams;
-class RPCServiceAddress;
-
/**
* Network implementation based on RPC. This class is responsible for
* keeping track of services and for sending messages to services.
@@ -36,7 +30,6 @@ class RPCServiceAddress;
class RPCNetwork : public INetwork,
public FRT_Invokable {
private:
- using CompressionConfig = vespalib::compression::CompressionConfig;
struct SendContext : public RPCTarget::IVersionHandler {
vespalib::Lock _lock;
RPCNetwork &_net;
@@ -58,26 +51,24 @@ private:
void PerformTask() override;
};
- using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
-
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<RPCTargetPool> _targetPool;
- TargetPoolTask _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- std::unique_ptr<OOSManager> _oosManager;
- int _requestedPort;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
+ typedef std::map<vespalib::VersionSpecification, RPCSendAdapter*> SendAdapterMap;
+
+ INetworkOwner *_owner;
+ Identity _ident;
+ FastOS_ThreadPool _threadPool;
+ FNET_Transport _transport;
+ FRT_Supervisor _orb;
+ FNET_Scheduler &_scheduler;
+ RPCTargetPool _targetPool;
+ TargetPoolTask _targetPoolTask;
+ RPCServicePool _servicePool;
+ slobrok::ConfiguratorFactory _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ OOSManager _oosManager;
+ int _requestedPort;
+ RPCSendV1 _sendV1;
+ SendAdapterMap _sendAdapters;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -168,7 +159,7 @@ public:
*
* @return port number
**/
- int getPort() const;
+ int getPort() const { return _orb.GetListenPort(); }
/**
* Allocate a new rpc request object. The caller of this method gets the
@@ -200,7 +191,7 @@ public:
*
* @return internal OOS manager
**/
- OOSManager &getOOSManager() { return *_oosManager; }
+ OOSManager &getOOSManager() { return _oosManager; }
/**
* Obtain a reference to the internal supervisor. This is used by
@@ -208,7 +199,7 @@ public:
*
* @return The supervisor.
*/
- FRT_Supervisor &getSupervisor() { return *_orb; }
+ FRT_Supervisor &getSupervisor() { return _orb; }
/**
* Deliver an error reply to the recipients of a {@link SendContext} in a
@@ -218,7 +209,8 @@ public:
* @param errCode The error code to return.
* @param errMsg The error string to return.
*/
- void replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg);
+ void replyError(const SendContext &ctx, uint32_t errCode,
+ const string &errMsg);
void attach(INetworkOwner &owner) override;
const string getConnectionSpec() const override;
@@ -233,8 +225,9 @@ public:
void shutdown() override;
void postShutdownHook() override;
const slobrok::api::IMirrorAPI &getMirror() const override;
- CompressionConfig getCompressionConfig() { return _compressionConfig; }
+
void invoke(FRT_RPCRequest *req);
};
} // namespace mbus
+
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index df35d51cb54..e025db0e350 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -11,8 +11,7 @@ RPCNetworkParams::RPCNetworkParams() :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _connectionExpireSecs(30),
- _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
+ _connectionExpireSecs(30)
{ }
RPCNetworkParams::~RPCNetworkParams() {}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index bfc624a6523..a65248f7299 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -3,7 +3,6 @@
#include "identity.h"
#include <vespa/slobrok/cfg.h>
-#include <vespa/vespalib/util/compressionconfig.h>
namespace mbus {
@@ -13,15 +12,13 @@ namespace mbus {
*/
class RPCNetworkParams {
private:
- using CompressionConfig = vespalib::compression::CompressionConfig;
- Identity _identity;
+ Identity _identity;
config::ConfigUri _slobrokConfig;
- string _oosServerPattern;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- double _connectionExpireSecs;
- CompressionConfig _compressionConfig;
+ string _oosServerPattern;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ double _connectionExpireSecs;
public:
RPCNetworkParams();
@@ -180,12 +177,6 @@ public:
_maxOutputBufferSize = maxOutputBufferSize;
return *this;
}
-
- RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) {
- _compressionConfig = compressionConfig;
- return *this;
- }
- CompressionConfig getCompressionConfig() const { return _compressionConfig; }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
deleted file mode 100644
index 705b8648442..00000000000
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ /dev/null
@@ -1,282 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "rpcsend.h"
-#include "rpcsend_private.h"
-#include "rpcserviceaddress.h"
-#include <vespa/messagebus/network/rpcnetwork.h>
-#include <vespa/messagebus/tracelevel.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/errorcode.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/channel.h>
-#include <vespa/fnet/frt/reflection.h>
-
-#include <vespa/vespalib/data/slime/cursor.h>
-
-using vespalib::make_string;
-
-namespace mbus {
-
-using network::internal::ReplyContext;
-using network::internal::SendContext;
-
-namespace {
-
-class FillByCopy final : public PayLoadFiller
-{
-public:
- FillByCopy(BlobRef payload) : _payload(payload) { }
- void fill(FRT_Values & v) const override {
- v.AddData(_payload.data(), _payload.size());
- }
- void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
- v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
- }
-private:
- BlobRef _payload;
-};
-
-class FillByHandover final : public PayLoadFiller
-{
-public:
- FillByHandover(Blob payload) : _payload(std::move(payload)) { }
- void fill(FRT_Values & v) const override {
- size_t sz = _payload.size();
- v.AddData(std::move(_payload.payload()), sz);
- }
- void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override {
- v.setData(name, vespalib::Memory(_payload.data(), _payload.size()));
- }
-private:
- mutable Blob _payload;
-};
-
-}
-
-RPCSend::RPCSend() :
- _net(nullptr),
- _clientIdent("client"),
- _serverIdent("server")
-{ }
-
-RPCSend::~RPCSend() {}
-
-void
-RPCSend::attach(RPCNetwork &net)
-{
- _net = &net;
- const string &prefix = _net->getIdentity().getServicePrefix();
- if (!prefix.empty()) {
- _clientIdent = make_string("'%s'", prefix.c_str());
- _serverIdent = _clientIdent;
- }
-
- FRT_ReflectionBuilder builder(&_net->getSupervisor());
- build(builder);
-}
-
-void
-RPCSend::replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err)
-{
- Reply::UP reply(new EmptyReply());
- reply->setContext(Context(new ReplyContext(*req, version)));
- reply->getTrace().setLevel(traceLevel);
- reply->addError(err);
- handleReply(std::move(reply));
-}
-
-void
-RPCSend::handleDiscard(Context ctx)
-{
- ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
- FRT_RPCRequest &req = tmp->getRequest();
- FNET_Channel *chn = req.GetContext()._value.CHANNEL;
- req.SubRef();
- chn->Free();
-}
-
-void
-RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining)
-{
- send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
-}
-
-void
-RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining)
-{
- send(recipient, version, FillByCopy(payload), timeRemaining);
-}
-
-void
-RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & payload, uint64_t timeRemaining)
-{
- SendContext::UP ctx(new SendContext(recipient, timeRemaining));
- RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
- const Message &msg = recipient.getMessage();
- Route route = recipient.getRoute();
- Hop hop = route.removeHop(0);
-
- FRT_RPCRequest *req = _net->allocRequest();
- encodeRequest(*req, version, route, address, msg, recipient.getTrace().getLevel(), payload, timeRemaining);
-
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
- version.toString().c_str(), _clientIdent.c_str(),
- address.getServiceName().c_str(), ctx->getTimeout()));
- }
-
- if (hop.getIgnoreResult()) {
- address.getTarget().getFRTTarget().InvokeVoid(req);
- if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
- }
- Reply::UP reply(new EmptyReply());
- reply->getTrace().swap(ctx->getTrace());
- _net->getOwner().deliverReply(std::move(reply), recipient);
- } else {
- SendContext *ptr = ctx.release();
- req->SetContext(FNET_Context(ptr));
- address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
- }
-}
-
-void
-RPCSend::RequestDone(FRT_RPCRequest *req)
-{
- SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
- const string &serviceName = static_cast<RPCServiceAddress&>(ctx->getRecipient().getServiceAddress()).getServiceName();
- Reply::UP reply;
- Error error;
- Trace & trace = ctx->getTrace();
- if (!req->CheckReturnTypes(getReturnSpec())) {
- reply.reset(new EmptyReply());
- switch (req->GetErrorCode()) {
- case FRTE_RPC_TIMEOUT:
- error = Error(ErrorCode::TIMEOUT,
- make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
- serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
- break;
- case FRTE_RPC_CONNECTION:
- error = Error(ErrorCode::CONNECTION_ERROR,
- make_string("A connection error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- break;
- default:
- error = Error(ErrorCode::NETWORK_ERROR,
- make_string("A network error occured for '%s'; %s",
- serviceName.c_str(), req->GetErrorMessage()));
- }
- } else {
- FRT_Values &ret = *req->GetReturn();
- reply = createReply(ret, serviceName, error, trace.getRoot());
- }
- if (trace.shouldTrace(TraceLevel::SEND_RECEIVE)) {
- trace.trace(TraceLevel::SEND_RECEIVE,
- make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
- }
- reply->getTrace().swap(trace);
- if (error.getCode() != ErrorCode::NONE) {
- reply->addError(error);
- }
- _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
- req->SubRef();
-}
-
-std::unique_ptr<Reply>
-RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & version, BlobRef payload, Error & error) const
-{
- Reply::UP reply;
- IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
- if (protocol != nullptr) {
- Routable::UP routable = protocol->decode(version, payload);
- if (routable) {
- if (routable->isReply()) {
- reply.reset(static_cast<Reply*>(routable.release()));
- } else {
- error = Error(ErrorCode::DECODE_ERROR, "Payload decoded to a message when expecting a reply.");
- }
- } else {
- error = Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", protocolName.c_str()));
- }
-
- } else {
- error = Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", protocolName.c_str(), _serverIdent.c_str()));
- }
- return reply;
-}
-
-void
-RPCSend::handleReply(Reply::UP reply)
-{
- ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
- FRT_RPCRequest &req = ctx->getRequest();
- string version = ctx->getVersion().toString();
- if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
- version.c_str(), _serverIdent.c_str()));
- }
- Blob payload(0);
- if (reply->getType() != 0) {
- payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
- if (payload.size() == 0) {
- reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
- }
- }
- FRT_Values &ret = *req.GetReturn();
- createResponse(ret, version, *reply, std::move(payload));
- req.Return();
-}
-
-void
-RPCSend::invoke(FRT_RPCRequest *req)
-{
- req->Detach();
- FRT_Values &args = *req->GetParams();
-
- std::unique_ptr<Params> params = toParams(args);
- IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
- if (protocol == nullptr) {
- replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str())));
- return;
- }
- Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
- req->DiscardBlobs();
- if ( ! routable ) {
- replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::DECODE_ERROR,
- make_string("Protocol '%s' failed to decode routable.", params->getProtocol().c_str())));
- return;
- }
- if (routable->isReply()) {
- replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::DECODE_ERROR, "Payload decoded to a reply when expecting a mesage."));
- return;
- }
- Message::UP msg(static_cast<Message*>(routable.release()));
- vespalib::stringref route = params->getRoute();
- if (!route.empty()) {
- msg->setRoute(Route::parse(route));
- }
- msg->setContext(Context(new ReplyContext(*req, params->getVersion())));
- msg->pushHandler(*this, *this);
- msg->setRetryEnabled(params->useRetry());
- msg->setRetry(params->getRetries());
- msg->setTimeReceivedNow();
- msg->setTimeRemaining(params->getRemainingTime());
- msg->getTrace().setLevel(params->getTraceLevel());
- if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
- msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
- make_string("Message (type %d) received at %s for session '%s'.",
- msg->getType(), _serverIdent.c_str(), string(params->getSession()).c_str()));
- }
- _net->getOwner().deliverMessage(std::move(msg), params->getSession());
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
deleted file mode 100644
index c707b47f548..00000000000
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ /dev/null
@@ -1,95 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "rpcsendadapter.h"
-#include <vespa/messagebus/idiscardhandler.h>
-#include <vespa/messagebus/ireplyhandler.h>
-#include <vespa/messagebus/common.h>
-#include <vespa/fnet/frt/invokable.h>
-#include <vespa/fnet/frt/invoker.h>
-
-class FRT_ReflectionBuilder;
-
-namespace vespalib::slime { class Cursor; }
-namespace vespalib { class Memory; }
-namespace vespalib { class TraceNode; }
-namespace mbus {
-
-class Error;
-class Route;
-class Message;
-class RPCServiceAddress;
-
-class PayLoadFiller
-{
-public:
- virtual ~PayLoadFiller() { }
- virtual void fill(FRT_Values & v) const = 0;
- virtual void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const = 0;
-};
-
-class RPCSend : public RPCSendAdapter,
- public FRT_Invokable,
- public FRT_IRequestWait,
- public IDiscardHandler,
- public IReplyHandler
-{
-public:
- class Params {
- public:
- virtual ~Params() {}
- virtual vespalib::Version getVersion() const = 0;
- virtual vespalib::stringref getProtocol() const = 0;
- virtual uint32_t getTraceLevel() const = 0;
- virtual bool useRetry() const = 0;
- virtual uint32_t getRetries() const = 0;
- virtual uint64_t getRemainingTime() const = 0;
- virtual vespalib::stringref getRoute() const = 0;
- virtual vespalib::stringref getSession() const = 0;
- virtual BlobRef getPayload() const = 0;
- };
-protected:
- RPCNetwork *_net;
- string _clientIdent;
- string _serverIdent;
-
- virtual void build(FRT_ReflectionBuilder & builder) = 0;
- virtual std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
- Error & error, vespalib::TraceNode & rootTrace) const = 0;
- virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
- const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const = 0;
- virtual const char * getReturnSpec() const = 0;
- virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0;
- virtual std::unique_ptr<Params> toParams(const FRT_Values &param) const = 0;
-
- void send(RoutingNode &recipient, const vespalib::Version &version,
- const PayLoadFiller & filler, uint64_t timeRemaining);
- std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version,
- BlobRef payload, Error & error) const;
- /**
- * Send an error reply for a given request.
- *
- * @param request The FRT request to reply to.
- * @param version The version to serialize for.
- * @param traceLevel The trace level to set in the reply.
- * @param err The error to reply with.
- */
- void replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err);
-public:
- RPCSend();
- ~RPCSend();
-
- void invoke(FRT_RPCRequest *req);
-private:
- void attach(RPCNetwork &net) final override;
- void handleDiscard(Context ctx) final override;
- void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
- Blob payload, uint64_t timeRemaining) final override;
- void send(RoutingNode &recipient, const vespalib::Version &version,
- BlobRef payload, uint64_t timeRemaining) final override;
- void RequestDone(FRT_RPCRequest *req) final override;
- void handleReply(std::unique_ptr<Reply> reply) final override;
-};
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h
deleted file mode 100644
index f5867e79856..00000000000
--- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/messagebus/trace.h>
-#include <vespa/messagebus/routing/routingnode.h>
-
-namespace mbus::network::internal {
-/**
- * Implements a helper class to hold the necessary context to create a reply from
- * an rpc return value. This object is held as the context of an FRT_RPCRequest.
- */
-class SendContext {
-private:
- mbus::RoutingNode &_recipient;
- mbus::Trace _trace;
- double _timeout;
-
-public:
- typedef std::unique_ptr<SendContext> UP;
- SendContext(const SendContext &) = delete;
- SendContext & operator = (const SendContext &) = delete;
- SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
- : _recipient(recipient),
- _trace(recipient.getTrace().getLevel()),
- _timeout(timeRemaining * 0.001) { }
- mbus::RoutingNode &getRecipient() { return _recipient; }
- mbus::Trace &getTrace() { return _trace; }
- double getTimeout() { return _timeout; }
-};
-
-/**
- * Implements a helper class to hold the necessary context to send a reply as an
- * rpc return value. This object is held in the callstack of the reply.
- */
-class ReplyContext {
-private:
- FRT_RPCRequest &_request;
- vespalib::Version _version;
-
-public:
- typedef std::unique_ptr<ReplyContext> UP;
- ReplyContext(const ReplyContext &) = delete;
- ReplyContext & operator = (const ReplyContext &) = delete;
-
- ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
- : _request(request), _version(version) { }
- FRT_RPCRequest &getRequest() { return _request; }
- const vespalib::Version &getVersion() { return _version; }
-};
-
-
-}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
index 6b89a278b88..4cf45207010 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp
@@ -1,40 +1,87 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "rpcsendv1.h"
#include "rpcnetwork.h"
-#include "rpcserviceaddress.h"
+#include <vespa/messagebus/routing/routingnode.h>
#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/errorcode.h>
#include <vespa/messagebus/tracelevel.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/fnet/frt/reflection.h>
+#include <vespa/fnet/channel.h>
using vespalib::make_string;
-namespace mbus {
-
namespace {
-const char *METHOD_NAME = "mbus.send1";
-const char *METHOD_PARAMS = "sssbilsxi";
-const char *METHOD_RETURN = "sdISSsxs";
+/**
+ * Implements a helper class to hold the necessary context to create a reply from
+ * an rpc return value. This object is held as the context of an FRT_RPCRequest.
+ */
+class SendContext {
+private:
+ mbus::RoutingNode &_recipient;
+ mbus::Trace _trace;
+ double _timeout;
-}
+public:
+ typedef std::unique_ptr<SendContext> UP;
+ SendContext(const SendContext &) = delete;
+ SendContext & operator = (const SendContext &) = delete;
+ SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining)
+ : _recipient(recipient),
+ _trace(recipient.getTrace().getLevel()),
+ _timeout(timeRemaining * 0.001) { }
+ mbus::RoutingNode &getRecipient() { return _recipient; }
+ mbus::Trace &getTrace() { return _trace; }
+ double getTimeout() { return _timeout; }
+};
-bool RPCSendV1::isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons)
-{
- return (method == METHOD_NAME) &&
- (request == METHOD_PARAMS) &&
- (respons == METHOD_RETURN);
-}
+/**
+ * Implements a helper class to hold the necessary context to send a reply as an
+ * rpc return value. This object is held in the callstack of the reply.
+ */
+class ReplyContext {
+private:
+ FRT_RPCRequest &_request;
+ vespalib::Version _version;
+
+public:
+ typedef std::unique_ptr<ReplyContext> UP;
+ ReplyContext(const ReplyContext &) = delete;
+ ReplyContext & operator = (const ReplyContext &) = delete;
+
+ ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version)
+ : _request(request), _version(version) { }
+ FRT_RPCRequest &getRequest() { return _request; }
+ const vespalib::Version &getVersion() { return _version; }
+};
-const char *
-RPCSendV1::getReturnSpec() const {
- return METHOD_RETURN;
}
+namespace mbus {
+
+const char *RPCSendV1::METHOD_NAME = "mbus.send1";
+const char *RPCSendV1::METHOD_PARAMS = "sssbilsxi";
+const char *RPCSendV1::METHOD_RETURN = "sdISSsxs";
+
+RPCSendV1::RPCSendV1() :
+ _net(NULL),
+ _clientIdent("client"),
+ _serverIdent("server")
+{ }
+
+RPCSendV1::~RPCSendV1() {}
+
void
-RPCSendV1::build(FRT_ReflectionBuilder & builder)
+RPCSendV1::attach(RPCNetwork &net)
{
+ _net = &net;
+ const string &prefix = _net->getIdentity().getServicePrefix();
+ if (!prefix.empty()) {
+ _clientIdent = make_string("'%s'", prefix.c_str());
+ _serverIdent = _clientIdent;
+ }
+
+ FRT_ReflectionBuilder builder(&_net->getSupervisor());
builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV1::invoke), this);
builder.MethodDesc("Send a message bus request and get a reply back.");
builder.ParamDesc("version", "The version of the message.");
@@ -56,14 +103,59 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder)
builder.ReturnDesc("trace", "A string representation of the trace.");
}
+namespace {
+
+class FillByCopy : public PayLoadFiller
+{
+public:
+ FillByCopy(BlobRef payload) : _payload(payload) { }
+ void fill(FRT_Values & v) const override {
+ v.AddData(_payload.data(), _payload.size());
+ }
+private:
+ BlobRef _payload;
+};
+
+class FillByHandover : public PayLoadFiller
+{
+public:
+ FillByHandover(Blob payload) : _payload(std::move(payload)) { }
+ void fill(FRT_Values & v) const override {
+ v.AddData(std::move(_payload.payload()), _payload.size());
+ }
+private:
+ mutable Blob _payload;
+};
+
+}
+
void
-RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
- const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const
+RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
+ BlobRef payload, uint64_t timeRemaining)
{
+ send(recipient, version, FillByCopy(payload), timeRemaining);
+}
+
+void
+RPCSendV1::sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
+ Blob payload, uint64_t timeRemaining)
+{
+ send(recipient, version, FillByHandover(std::move(payload)), timeRemaining);
+}
- FRT_Values &args = *req.GetParams();
- req.SetMethodName(METHOD_NAME);
+void
+RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & payload, uint64_t timeRemaining)
+{
+ SendContext::UP ctx(new SendContext(recipient, timeRemaining));
+ RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress());
+ const Message &msg = recipient.getMessage();
+ Route route = recipient.getRoute();
+ Hop hop = route.removeHop(0);
+
+ FRT_RPCRequest *req = _net->allocRequest();
+ FRT_Values &args = *req->GetParams();
+ req->SetMethodName(METHOD_NAME);
args.AddString(version.toString().c_str());
args.AddString(route.toString().c_str());
args.AddString(address.getSessionName().c_str());
@@ -71,103 +163,237 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version,
args.AddInt32(msg.getRetry());
args.AddInt64(timeRemaining);
args.AddString(msg.getProtocol().c_str());
- filler.fill(args);
- args.AddInt32(traceLevel);
-}
+ payload.fill(args);
+ args.AddInt32(recipient.getTrace().getLevel());
-namespace {
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.",
+ version.toString().c_str(), _clientIdent.c_str(),
+ address.getServiceName().c_str(), ctx->getTimeout()));
+ }
+
+ if (hop.getIgnoreResult()) {
+ address.getTarget().getFRTTarget().InvokeVoid(req);
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str()));
+ }
+ Reply::UP reply(new EmptyReply());
+ reply->getTrace().swap(ctx->getTrace());
+ _net->getOwner().deliverReply(std::move(reply), recipient);
+ } else {
+ SendContext *ptr = ctx.release();
+ req->SetContext(FNET_Context(ptr));
+ address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this);
+ }
+}
-class ParamsV1 : public RPCSend::Params
+void
+RPCSendV1::RequestDone(FRT_RPCRequest *req)
{
-public:
- ParamsV1(const FRT_Values &args) : _args(args) { }
+ SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP));
+ const string &serviceName = static_cast<RPCServiceAddress&>(
+ ctx->getRecipient().getServiceAddress()).getServiceName();
+ Reply::UP reply;
+ Error error;
+ if (!req->CheckReturnTypes(METHOD_RETURN)) {
+ reply.reset(new EmptyReply());
+ switch (req->GetErrorCode()) {
+ case FRTE_RPC_TIMEOUT:
+ error = Error(ErrorCode::TIMEOUT,
+ make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s",
+ serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage()));
+ break;
+ case FRTE_RPC_CONNECTION:
+ error = Error(ErrorCode::CONNECTION_ERROR,
+ make_string("A connection error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ break;
+ default:
+ error = Error(ErrorCode::NETWORK_ERROR,
+ make_string("A network error occured for '%s'; %s",
+ serviceName.c_str(), req->GetErrorMessage()));
+ }
+ } else {
+ FRT_Values &ret = *req->GetReturn();
- uint32_t getTraceLevel() const override { return _args[8]._intval32; }
- bool useRetry() const override { return _args[3]._intval8 != 0; }
- uint32_t getRetries() const override { return _args[4]._intval32; }
- uint64_t getRemainingTime() const override { return _args[5]._intval64; }
+ vespalib::Version version = vespalib::Version(ret[0]._string._str);
+ double retryDelay = ret[1]._double;
+ uint32_t *errorCodes = ret[2]._int32_array._pt;
+ uint32_t errorCodesLen = ret[2]._int32_array._len;
+ FRT_StringValue *errorMessages = ret[3]._string_array._pt;
+ uint32_t errorMessagesLen = ret[3]._string_array._len;
+ FRT_StringValue *errorServices = ret[4]._string_array._pt;
+ uint32_t errorServicesLen = ret[4]._string_array._len;
+ const char *protocolName = ret[5]._string._str;
+ const char *payload = ret[6]._data._buf;
+ uint32_t payloadLen = ret[6]._data._len;
+ const char *trace = ret[7]._string._str;
- vespalib::Version getVersion() const override {
- return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len));
- }
- vespalib::stringref getRoute() const override {
- return vespalib::stringref(_args[1]._string._str, _args[1]._string._len);
- }
- vespalib::stringref getSession() const override {
- return vespalib::stringref(_args[2]._string._str, _args[2]._string._len);
+ if (payloadLen > 0) {
+ IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
+ if (protocol != nullptr) {
+ Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
+ if (routable) {
+ if (routable->isReply()) {
+ reply.reset(static_cast<Reply*>(routable.release()));
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR,
+ "Payload decoded to a message when expecting a reply.");
+ }
+ } else {
+ error = Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", protocolName));
+ }
+
+ } else {
+ error = Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str()));
+ }
+ }
+ if ( ! reply ) {
+ reply.reset(new EmptyReply());
+ }
+ reply->setRetryDelay(retryDelay);
+ for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
+ reply->addError(Error(errorCodes[i],
+ errorMessages[i]._str,
+ errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
+ }
+ ctx->getTrace().getRoot().addChild(TraceNode::decode(trace));
}
- vespalib::stringref getProtocol() const override {
- return vespalib::stringref(_args[6]._string._str, _args[6]._string._len);
+ if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ ctx->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str()));
}
- BlobRef getPayload() const override {
- return BlobRef(_args[7]._data._buf, _args[7]._data._len);
+ reply->getTrace().swap(ctx->getTrace());
+ if (error.getCode() != ErrorCode::NONE) {
+ reply->addError(error);
}
-private:
- const FRT_Values & _args;
-};
-
+ _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient());
+ req->SubRef();
}
-std::unique_ptr<RPCSend::Params>
-RPCSendV1::toParams(const FRT_Values &args) const
+void
+RPCSendV1::invoke(FRT_RPCRequest *req)
{
- return std::make_unique<ParamsV1>(args);
-}
+ req->Detach();
+ FRT_Values &args = *req->GetParams();
+ vespalib::Version version = vespalib::Version(args[0]._string._str);
+ const char *route = args[1]._string._str;
+ const char *session = args[2]._string._str;
+ bool retryEnabled = args[3]._intval8 != 0;
+ uint32_t retry = args[4]._intval32;
+ uint64_t timeRemaining = args[5]._intval64;
+ const char *protocolName = args[6]._string._str;
+ const char *payload = args[7]._data._buf;
+ uint32_t payloadLen = args[7]._data._len;
+ uint32_t traceLevel = args[8]._intval32;
-std::unique_ptr<Reply>
-RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const
-{
- vespalib::Version version = vespalib::Version(ret[0]._string._str);
- double retryDelay = ret[1]._double;
- uint32_t *errorCodes = ret[2]._int32_array._pt;
- uint32_t errorCodesLen = ret[2]._int32_array._len;
- FRT_StringValue *errorMessages = ret[3]._string_array._pt;
- uint32_t errorMessagesLen = ret[3]._string_array._len;
- FRT_StringValue *errorServices = ret[4]._string_array._pt;
- uint32_t errorServicesLen = ret[4]._string_array._len;
- const char *protocolName = ret[5]._string._str;
- BlobRef payload(ret[6]._data._buf, ret[6]._data._len);
- const char *trace = ret[7]._string._str;
-
- Reply::UP reply;
- if (payload.size() > 0) {
- reply = decode(protocolName, version, payload, error);
+ IProtocol * protocol = _net->getOwner().getProtocol(protocolName);
+ if (protocol == nullptr) {
+ replyError(req, version, traceLevel,
+ Error(ErrorCode::UNKNOWN_PROTOCOL,
+ make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str())));
+ return;
}
- if ( ! reply ) {
- reply.reset(new EmptyReply());
+ Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen));
+ req->DiscardBlobs();
+ if ( ! routable ) {
+ replyError(req, version, traceLevel,
+ Error(ErrorCode::DECODE_ERROR,
+ make_string("Protocol '%s' failed to decode routable.", protocolName)));
+ return;
+ }
+ if (routable->isReply()) {
+ replyError(req, version, traceLevel,
+ Error(ErrorCode::DECODE_ERROR,
+ "Payload decoded to a reply when expecting a mesage."));
+ return;
}
- reply->setRetryDelay(retryDelay);
- for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) {
- reply->addError(Error(errorCodes[i], errorMessages[i]._str,
- errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str()));
+ Message::UP msg(static_cast<Message*>(routable.release()));
+ if (strlen(route) > 0) {
+ msg->setRoute(Route::parse(route));
}
- rootTrace.addChild(TraceNode::decode(trace));
- return reply;
+ msg->setContext(Context(new ReplyContext(*req, version)));
+ msg->pushHandler(*this, *this);
+ msg->setRetryEnabled(retryEnabled);
+ msg->setRetry(retry);
+ msg->setTimeReceivedNow();
+ msg->setTimeRemaining(timeRemaining);
+ msg->getTrace().setLevel(traceLevel);
+ if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ msg->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ make_string("Message (type %d) received at %s for session '%s'.",
+ msg->getType(), _serverIdent.c_str(), session));
+ }
+ _net->getOwner().deliverMessage(std::move(msg), session);
}
void
-RPCSendV1::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const {
+RPCSendV1::handleReply(Reply::UP reply)
+{
+ ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
+ FRT_RPCRequest &req = ctx->getRequest();
+ string version = ctx->getVersion().toString();
+ if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.",
+ version.c_str(), _serverIdent.c_str()));
+ }
+ Blob payload(0);
+ if (reply->getType() != 0) {
+ payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply);
+ if (payload.size() == 0) {
+ reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log."));
+ }
+ }
+ FRT_Values &ret = *req.GetReturn();
ret.AddString(version.c_str());
- ret.AddDouble(reply.getRetryDelay());
+ ret.AddDouble(reply->getRetryDelay());
- uint32_t errorCount = reply.getNumErrors();
+ uint32_t errorCount = reply->getNumErrors();
uint32_t *errorCodes = ret.AddInt32Array(errorCount);
FRT_StringValue *errorMessages = ret.AddStringArray(errorCount);
FRT_StringValue *errorServices = ret.AddStringArray(errorCount);
for (uint32_t i = 0; i < errorCount; ++i) {
- errorCodes[i] = reply.getError(i).getCode();
- ret.SetString(errorMessages + i, reply.getError(i).getMessage().c_str());
- ret.SetString(errorServices + i, reply.getError(i).getService().c_str());
+ errorCodes[i] = reply->getError(i).getCode();
+ ret.SetString(errorMessages + i,
+ reply->getError(i).getMessage().c_str());
+ ret.SetString(errorServices + i,
+ reply->getError(i).getService().c_str());
}
- ret.AddString(reply.getProtocol().c_str());
+ ret.AddString(reply->getProtocol().c_str());
ret.AddData(std::move(payload.payload()), payload.size());
- if (reply.getTrace().getLevel() > 0) {
- ret.AddString(reply.getTrace().getRoot().encode().c_str());
+ if (reply->getTrace().getLevel() > 0) {
+ ret.AddString(reply->getTrace().getRoot().encode().c_str());
} else {
ret.AddString("");
}
+ req.Return();
+}
+
+void
+RPCSendV1::handleDiscard(Context ctx)
+{
+ ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR));
+ FRT_RPCRequest &req = tmp->getRequest();
+ FNET_Channel *chn = req.GetContext()._value.CHANNEL;
+ req.SubRef();
+ chn->Free();
+}
+
+void
+RPCSendV1::replyError(FRT_RPCRequest *req, const vespalib::Version &version,
+ uint32_t traceLevel, const Error &err)
+{
+ Reply::UP reply(new EmptyReply());
+ reply->setContext(Context(new ReplyContext(*req, version)));
+ reply->getTrace().setLevel(traceLevel);
+ reply->addError(err);
+ handleReply(std::move(reply));
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
index 37f23335309..b634c1ef1b8 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h
@@ -1,24 +1,77 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "rpcsend.h"
+#include "rpcsendadapter.h"
+#include <vespa/messagebus/idiscardhandler.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/fnet/frt/invokable.h>
+
namespace mbus {
-class RPCSendV1 : public RPCSend {
+class Error;
+
+class PayLoadFiller
+{
public:
- static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons);
+ virtual ~PayLoadFiller() { }
+ virtual void fill(FRT_Values & v) const = 0;
+};
+
+/**
+ * Implements the send adapter for method "mbus.send".
+ */
+class RPCSendV1 : public RPCSendAdapter,
+ public FRT_Invokable,
+ public FRT_IRequestWait,
+ public IDiscardHandler,
+ public IReplyHandler {
private:
- void build(FRT_ReflectionBuilder & builder) override;
- const char * getReturnSpec() const override;
- std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
- void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
- const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const override;
-
- std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
- Error & error, vespalib::TraceNode & rootTrace) const override;
- void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
+ RPCNetwork *_net;
+ string _clientIdent;
+ string _serverIdent;
+
+ /**
+ * Send an error reply for a given request.
+ *
+ * @param request The FRT request to reply to.
+ * @param version The version to serialize for.
+ * @param traceLevel The trace level to set in the reply.
+ * @param err The error to reply with.
+ */
+ void replyError(FRT_RPCRequest *req, const vespalib::Version &version,
+ uint32_t traceLevel, const Error &err);
+
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ const PayLoadFiller & filler, uint64_t timeRemaining);
+public:
+ /** The name of the rpc method that this adapter registers. */
+ static const char *METHOD_NAME;
+
+ /** The parameter string of the rpc method. */
+ static const char *METHOD_PARAMS;
+
+ /** The return string of the rpc method. */
+ static const char *METHOD_RETURN;
+
+ /**
+ * Constructs a new instance of this adapter. This object is unusable until
+ * its attach() method has been called.
+ */
+ RPCSendV1();
+ ~RPCSendV1();
+
+ void attach(RPCNetwork &net) override;
+
+ void send(RoutingNode &recipient, const vespalib::Version &version,
+ BlobRef payload, uint64_t timeRemaining) override;
+ void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
+ Blob payload, uint64_t timeRemaining) override;
+
+ void handleReply(std::unique_ptr<Reply> reply) override;
+ void handleDiscard(Context ctx) override;
+ void invoke(FRT_RPCRequest *req);
+ void RequestDone(FRT_RPCRequest *req) override;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
deleted file mode 100644
index 1228e08f3b4..00000000000
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
+++ /dev/null
@@ -1,263 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "rpcsendv2.h"
-#include "rpcnetwork.h"
-#include "rpcserviceaddress.h"
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/tracelevel.h>
-#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/data/slime/slime.h>
-#include <vespa/vespalib/data/databuffer.h>
-#include <vespa/vespalib/util/compressor.h>
-#include <vespa/fnet/frt/reflection.h>
-
-using vespalib::make_string;
-using vespalib::compression::CompressionConfig;
-using vespalib::compression::decompress;
-using vespalib::compression::compress;
-using vespalib::DataBuffer;
-using vespalib::ConstBufferRef;
-using vespalib::stringref;
-using vespalib::Memory;
-using vespalib::Slime;
-using vespalib::Version;
-using namespace vespalib::slime;
-
-namespace mbus {
-
-namespace {
-
-const char *METHOD_NAME = "mbus.slime";
-const char *METHOD_PARAMS = "bixbix";
-const char *METHOD_RETURN = "bixbix";
-
-Memory VERSION_F("version");
-Memory ROUTE_F("route");
-Memory SESSION_F("session");
-Memory USERETRY_F("useretry");
-Memory RETRYDELAY_F("retrydelay");
-Memory RETRY_F("retry");
-Memory TIMELEFT_F("timeleft");
-Memory PROTOCOL_F("prot");
-Memory TRACELEVEL_F("tracelevel");
-Memory TRACE_F("trace");
-Memory BLOB_F("msg");
-Memory ERRORS_F("errors");
-Memory CODE_F("code");
-Memory MSG_F("msg");
-Memory SERVICE_F("service");
-
-}
-
-bool RPCSendV2::isCompatible(stringref method, stringref request, stringref response)
-{
- return (method == METHOD_NAME) &&
- (request == METHOD_PARAMS) &&
- (response == METHOD_RETURN);
-}
-
-void
-RPCSendV2::build(FRT_ReflectionBuilder & builder)
-{
- builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV2::invoke), this);
- builder.MethodDesc("Send a message bus slime request and get a reply back.");
- builder.ParamDesc("header_encoding", "0=raw, 6=lz4");
- builder.ParamDesc("header_decoded_size", "Uncompressed header blob size");
- builder.ParamDesc("header_payload", "The message header blob in slime");
- builder.ParamDesc("body_encoding", "0=raw, 6=lz4");
- builder.ParamDesc("body_decoded_size", "Uncompressed body blob size");
- builder.ParamDesc("body_payload", "The message body blob in slime");
- builder.ReturnDesc("header_encoding", "0=raw, 6=lz4");
- builder.ReturnDesc("header_decoded_size", "Uncompressed header blob size");
- builder.ReturnDesc("header_payload", "The reply header blob in slime.");
- builder.ReturnDesc("body_encoding", "0=raw, 6=lz4");
- builder.ReturnDesc("body_decoded_size", "Uncompressed body blob size");
- builder.ReturnDesc("body_payload", "The reply body blob in slime.");
-}
-
-const char *
-RPCSendV2::getReturnSpec() const {
- return METHOD_RETURN;
-}
-
-namespace {
-class OutputBuf : public vespalib::Output {
-public:
- OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { }
- DataBuffer & getBuf() { return _buf; }
-private:
- vespalib::WritableMemory reserve(size_t bytes) override {
- _buf.ensureFree(bytes);
- return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen());
- }
- Output &commit(size_t bytes) override {
- _buf.moveFreeToData(bytes);
- return *this;
- }
- DataBuffer _buf;
-};
-}
-
-void
-RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route,
- const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const
-{
- FRT_Values &args = *req.GetParams();
- req.SetMethodName(METHOD_NAME);
- // Place holder for auxillary data to be transfered later.
- args.AddInt8(CompressionConfig::NONE);
- args.AddInt32(0);
- args.AddData("", 0);
-
- Slime slime;
- Cursor & root = slime.setObject();
-
- root.setString(VERSION_F, version.toString());
- root.setString(ROUTE_F, route.toString());
- root.setString(SESSION_F, address.getSessionName());
- root.setBool(USERETRY_F, msg.getRetryEnabled());
- root.setLong(RETRY_F, msg.getRetry());
- root.setLong(TIMELEFT_F, timeRemaining);
- root.setString(PROTOCOL_F, msg.getProtocol());
- root.setLong(TRACELEVEL_F, traceLevel);
- filler.fill(BLOB_F, root);
-
- OutputBuf rBuf(8192);
- BinaryFormat::encode(slime, rBuf);
- ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
- DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
- CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
-
- args.AddInt8(type);
- args.AddInt32(toCompress.size());
- args.AddData(buf.stealBuffer(), buf.getDataLen());
-}
-
-namespace {
-
-class ParamsV2 : public RPCSend::Params
-{
-public:
- ParamsV2(const FRT_Values &arg)
- : _slime()
- {
- uint8_t encoding = arg[3]._intval8;
- uint32_t uncompressedSize = arg[4]._intval32;
- DataBuffer uncompressed(arg[5]._data._buf, arg[5]._data._len);
- ConstBufferRef blob(arg[5]._data._buf, arg[5]._data._len);
- decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
- assert(uncompressedSize == uncompressed.getDataLen());
- BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), _slime);
- }
-
- uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); }
- bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); }
- uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); }
- uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); }
-
- Version getVersion() const override {
- return Version(_slime.get()[VERSION_F].asString().make_stringref());
- }
- stringref getRoute() const override {
- return _slime.get()[ROUTE_F].asString().make_stringref();
- }
- stringref getSession() const override {
- return _slime.get()[SESSION_F].asString().make_stringref();
- }
- stringref getProtocol() const override {
- return _slime.get()[PROTOCOL_F].asString().make_stringref();
- }
- BlobRef getPayload() const override {
- Memory m = _slime.get()[BLOB_F].asData();
- return BlobRef(m.data, m.size);
- }
-private:
- Slime _slime;
-};
-
-}
-
-std::unique_ptr<RPCSend::Params>
-RPCSendV2::toParams(const FRT_Values &args) const
-{
- return std::make_unique<ParamsV2>(args);
-}
-
-std::unique_ptr<Reply>
-RPCSendV2::createReply(const FRT_Values & ret, const string & serviceName,
- Error & error, vespalib::TraceNode & rootTrace) const
-{
- uint8_t encoding = ret[3]._intval8;
- uint32_t uncompressedSize = ret[4]._intval32;
- DataBuffer uncompressed(ret[5]._data._buf, ret[5]._data._len);
- ConstBufferRef blob(ret[5]._data._buf, ret[5]._data._len);
- decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true);
- assert(uncompressedSize == uncompressed.getDataLen());
- Slime slime;
- BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime);
- Inspector & root = slime.get();
- Version version(root[VERSION_F].asString().make_string());
- Memory payload = root[BLOB_F].asData();
-
- Reply::UP reply;
- if (payload.size > 0) {
- reply = decode(root[PROTOCOL_F].asString().make_stringref(), version, BlobRef(payload.data, payload.size), error);
- }
- if ( ! reply ) {
- reply.reset(new EmptyReply());
- }
- reply->setRetryDelay(root[RETRYDELAY_F].asDouble());
- Inspector & errors = root[ERRORS_F];
- for (uint32_t i = 0; i < errors.entries(); ++i) {
- Inspector & e = errors[i];
- Memory service = e[SERVICE_F].asString();
- reply->addError(Error(e[CODE_F].asLong(), e[MSG_F].asString().make_string(),
- (service.size > 0) ? service.make_string() : serviceName));
- }
- rootTrace.addChild(TraceNode::decode(root[TRACE_F].asString().make_string()));
- return reply;
-}
-
-void
-RPCSendV2::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const
-{
- // Place holder for auxillary data to be transfered later.
- ret.AddInt8(CompressionConfig::NONE);
- ret.AddInt32(0);
- ret.AddData("", 0);
-
- Slime slime;
- Cursor & root = slime.setObject();
-
- root.setString(VERSION_F, version);
- root.setDouble(RETRYDELAY_F, reply.getRetryDelay());
- root.setString(PROTOCOL_F, reply.getProtocol());
- root.setData(BLOB_F, vespalib::Memory(payload.data(), payload.size()));
- if (reply.getTrace().getLevel() > 0) {
- root.setString(TRACE_F, reply.getTrace().getRoot().encode());
- }
-
- if (reply.getNumErrors() > 0) {
- Cursor & array = root.setArray(ERRORS_F);
- for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
- Cursor & error = array.addObject();
- error.setLong(CODE_F, reply.getError(i).getCode());
- error.setString(MSG_F, reply.getError(i).getMessage());
- error.setString(SERVICE_F, reply.getError(i).getService().c_str());
- }
- }
-
- OutputBuf rBuf(8192);
- BinaryFormat::encode(slime, rBuf);
- ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen());
- DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen()));
- CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false);
-
- ret.AddInt8(type);
- ret.AddInt32(toCompress.size());
- ret.AddData(buf.stealBuffer(), buf.getDataLen());
-
-}
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
deleted file mode 100644
index e793868d2aa..00000000000
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h
+++ /dev/null
@@ -1,24 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include "rpcsend.h"
-
-namespace mbus {
-
-class RPCSendV2 : public RPCSend {
-public:
- static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref response);
-private:
- void build(FRT_ReflectionBuilder & builder) override;
- const char * getReturnSpec() const override;
- std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
- void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
- const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel,
- const PayLoadFiller &filler, uint64_t timeRemaining) const override;
-
- std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
- Error & error, vespalib::TraceNode & rootTrace) const override;
- void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override;
-};
-
-} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
index 36dde19bd18..c6856057342 100644
--- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
+++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h
@@ -84,7 +84,7 @@ public:
*
* @return True if target is set.
*/
- bool hasTarget() const { return _target.get() != nullptr; }
+ bool hasTarget() const { return _target.get() != NULL; }
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index 295814f4a8d..20e5a2eb3e3 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -36,7 +36,7 @@ RPCTargetPool::flushTargets(bool force)
TargetMap::iterator it = _targets.begin();
while (it != _targets.end()) {
Entry &entry = it->second;
- if (entry._target.get() != nullptr) {
+ if (entry._target.get() != NULL) {
if (entry._target.use_count() > 1) {
entry._lastUse = currentTime;
++it;
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
index 5f858f66993..683982de080 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h
@@ -1,11 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include "rpcserviceaddress.h"
-#include "rpctarget.h"
+#include <map>
#include <vespa/messagebus/itimer.h>
#include <vespa/vespalib/util/sync.h>
-#include <map>
+#include "rpcserviceaddress.h"
+#include "rpctarget.h"
class FRT_Supervisor;
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp
index 4e2efcfb3b9..d2661e3ef80 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.cpp
+++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp
@@ -76,7 +76,7 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName,
} catch (const std::exception &e) {
LOG(error, "Protocol '%s' threw an exception; %s", protocolName.c_str(), e.what());
}
- if (policy.get() == nullptr) {
+ if (policy.get() == NULL) {
LOG(error, "Protocol '%s' failed to create routing policy '%s' with parameter '%s'.",
protocolName.c_str(), policyName.c_str(), policyParam.c_str());
return IRoutingPolicy::SP();
diff --git a/messagebus/src/vespa/messagebus/routing/route.cpp b/messagebus/src/vespa/messagebus/routing/route.cpp
index b705847c3a5..af7b5113ac2 100644
--- a/messagebus/src/vespa/messagebus/routing/route.cpp
+++ b/messagebus/src/vespa/messagebus/routing/route.cpp
@@ -69,7 +69,7 @@ Route::toDebugString() const {
}
Route
-Route::parse(vespalib::stringref route)
+Route::parse(const string &route)
{
return RouteParser::createRoute(route);
}
diff --git a/messagebus/src/vespa/messagebus/routing/route.h b/messagebus/src/vespa/messagebus/routing/route.h
index a2a01648cfe..d9932e17d26 100644
--- a/messagebus/src/vespa/messagebus/routing/route.h
+++ b/messagebus/src/vespa/messagebus/routing/route.h
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <vector>
#include "hop.h"
namespace mbus {
@@ -32,7 +33,7 @@ public:
* @param route The string to parse.
* @return A route that corresponds to the string.
*/
- static Route parse(vespalib::stringref route);
+ static Route parse(const string &route);
/**
* Create a Route that contains no hops
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.cpp b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
index 668f14f9801..ac52ae3b598 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.cpp
@@ -71,7 +71,7 @@ RouteParser::createDirective(const stringref &str)
}
Hop
-RouteParser::createHop(stringref str)
+RouteParser::createHop(const stringref &str)
{
if (str.empty()) {
return Hop().addDirective(createErrorDirective("Failed to parse empty string."));
@@ -84,7 +84,7 @@ RouteParser::createHop(stringref str)
}
if (len > 4 && str.substr(0, 4) == "tcp/") {
IHopDirective::SP tcp = createTcpDirective(str.substr(4));
- if (tcp.get() != nullptr) {
+ if (tcp.get() != NULL) {
return Hop().addDirective(tcp);
}
}
@@ -119,7 +119,7 @@ RouteParser::createHop(stringref str)
}
Route
-RouteParser::createRoute(stringref str)
+RouteParser::createRoute(const stringref &str)
{
Route ret;
for (size_t from = 0, at = 0, depth = 0; at <= str.size(); ++at) {
diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.h b/messagebus/src/vespa/messagebus/routing/routeparser.h
index 8ffba3f6e11..a3f16e49307 100644
--- a/messagebus/src/vespa/messagebus/routing/routeparser.h
+++ b/messagebus/src/vespa/messagebus/routing/routeparser.h
@@ -29,7 +29,7 @@ public:
* @param str The string to parse as a hop.
* @return The created hop.
*/
- static Hop createHop(vespalib::stringref str);
+ static Hop createHop(const vespalib::stringref &str);
/**
* Creates a route from a string representation.
@@ -37,7 +37,7 @@ public:
* @param str The string to parse as a route.
* @return The created route.
*/
- static Route createRoute(vespalib::stringref str);
+ static Route createRoute(const vespalib::stringref &str);
};
} // mbus
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
index 6e100999e1d..62efda4aeb9 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp
@@ -21,7 +21,7 @@ RoutingNode::RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender,
: _mbus(mbus),
_net(net),
_resender(resender),
- _parent(nullptr),
+ _parent(NULL),
_recipients(),
_children(),
_replyHandler(&replyHandler),
@@ -45,8 +45,8 @@ RoutingNode::RoutingNode(RoutingNode &parent, const Route &route)
_parent(&parent),
_recipients(parent._recipients),
_children(),
- _replyHandler(nullptr),
- _discardHandler(nullptr),
+ _replyHandler(NULL),
+ _discardHandler(NULL),
_trace(parent._trace.getLevel()),
_pending(0),
_msg(parent._msg),
@@ -78,8 +78,8 @@ RoutingNode::clearChildren()
void
RoutingNode::discard()
{
- assert(_parent == nullptr);
- if (_discardHandler != nullptr) {
+ assert(_parent == NULL);
+ if (_discardHandler != NULL) {
_discardHandler->handleDiscard(Context());
}
}
@@ -101,7 +101,7 @@ RoutingNode::prepareForRetry()
{
_shouldRetry = false;
_reply.reset();
- if (_routingContext.get() != nullptr && _routingContext->getSelectOnRetry()) {
+ if (_routingContext.get() != NULL && _routingContext->getSelectOnRetry()) {
clearChildren();
} else if (!_children.empty()) {
bool retryingSome = false;
@@ -109,7 +109,7 @@ RoutingNode::prepareForRetry()
it != _children.end(); ++it)
{
RoutingNode *child= *it;
- if (child->_shouldRetry || child->_reply.get() == nullptr) {
+ if (child->_shouldRetry || child->_reply.get() == NULL) {
child->prepareForRetry();
retryingSome = true;
}
@@ -126,11 +126,11 @@ RoutingNode::prepareForRetry()
void
RoutingNode::notifyParent()
{
- if (_serviceAddress.get() != nullptr) {
+ if (_serviceAddress.get() != NULL) {
_net.freeServiceAddress(*this);
}
tryIgnoreResult();
- if (_parent != nullptr) {
+ if (_parent != NULL) {
_parent->notifyMerge();
return;
}
@@ -174,7 +174,7 @@ RoutingNode::addError(uint32_t code, const string &msg)
void
RoutingNode::addError(const Error &err)
{
- if (_reply.get() != nullptr) {
+ if (_reply.get() != NULL) {
_reply->getTrace().swap(_trace);
_reply->addError(err);
_reply->getTrace().swap(_trace);
@@ -186,8 +186,8 @@ RoutingNode::addError(const Error &err)
void
RoutingNode::setReply(Reply::UP reply)
{
- if (reply.get() != nullptr) {
- _shouldRetry = _resender != nullptr && _resender->shouldRetry(*reply);
+ if (reply.get() != NULL) {
+ _shouldRetry = _resender != NULL && _resender->shouldRetry(*reply);
_trace.getRoot().addChild(reply->getTrace().getRoot());
reply->getTrace().clear();
}
@@ -211,7 +211,7 @@ RoutingNode::notifyAbort(const string &msg)
mystack.pop();
if (!node->_isActive) {
// reply not pending
- } else if (node->_reply.get() != nullptr) {
+ } else if (node->_reply.get() != NULL) {
node->notifyParent();
} else if (node->_children.empty()) {
node->setError(ErrorCode::SEND_ABORTED, msg);
@@ -240,7 +240,7 @@ RoutingNode::notifyTransmit()
if (node->hasReply()) {
node->notifyParent();
} else {
- assert(node->_serviceAddress.get() != nullptr);
+ assert(node->_serviceAddress.get() != NULL);
sendTo.push_back(node);
}
} else {
@@ -296,7 +296,7 @@ RoutingNode::notifyMerge()
setError(ErrorCode::POLICY_ERROR, make_string("Policy '%s' threw an exception; %s",
dir.getName().c_str(), e.what()));
}
- if (_reply.get() == nullptr) {
+ if (_reply.get() == NULL) {
setError(ErrorCode::APP_FATAL_ERROR, make_string("Routing policy '%s' failed to merge replies.",
dir.getName().c_str()));
}
@@ -315,12 +315,12 @@ RoutingNode::hasUnconsumedErrors()
while (!mystack.empty()) {
RoutingNode *node = mystack.top();
mystack.pop();
- if (node->_reply.get() != nullptr) {
+ if (node->_reply.get() != NULL) {
for (uint32_t i = 0; i < node->_reply->getNumErrors(); ++i) {
int errorCode = node->_reply->getError(i).getCode();
RoutingNode *it = node;
- while (it != nullptr) {
- if (it->_routingContext.get() != nullptr &&
+ while (it != NULL) {
+ if (it->_routingContext.get() != NULL &&
it->_routingContext->isConsumableError(errorCode))
{
errorCode = ErrorCode::NONE;
@@ -329,7 +329,7 @@ RoutingNode::hasUnconsumedErrors()
it = it->_parent;
}
if (errorCode != ErrorCode::NONE) {
- _shouldRetry = _resender != nullptr && _resender->canRetry(errorCode);
+ _shouldRetry = _resender != NULL && _resender->canRetry(errorCode);
if (!_shouldRetry) {
return true; // no need to continue
}
@@ -374,17 +374,17 @@ RoutingNode::resolve(uint32_t depth)
if (executePolicySelect()) {
return resolveChildren(depth + 1);
}
- return _reply.get() != nullptr;
+ return _reply.get() != NULL;
}
_net.allocServiceAddress(*this);
- return _serviceAddress.get() != nullptr || _reply.get() != nullptr;
+ return _serviceAddress.get() != NULL || _reply.get() != NULL;
}
bool
RoutingNode::lookupHop()
{
RoutingTable::SP table = _mbus.getRoutingTable(_msg.getProtocol());
- if (table.get() != nullptr) {
+ if (table.get() != NULL) {
string name = _route.getHop(0).getServiceName();
if (table->hasHop(name)) {
const HopBlueprint *hop = table->getHop(name);
@@ -404,7 +404,7 @@ RoutingNode::lookupRoute()
Hop &hop = _route.getHop(0);
if (hop.getDirective(0)->getType() == IHopDirective::TYPE_ROUTE) {
RouteDirective &dir = static_cast<RouteDirective&>(*hop.getDirective(0));
- if (table.get() == nullptr || !table->hasRoute(dir.getName())) {
+ if (table.get() == NULL || !table->hasRoute(dir.getName())) {
setError(ErrorCode::ILLEGAL_ROUTE,
make_string("Route '%s' does not exist.", dir.getName().c_str()));
return false;
@@ -415,7 +415,7 @@ RoutingNode::lookupRoute()
dir.getName().c_str(), _route.toString().c_str()));
return true;
}
- if (table.get() != nullptr) {
+ if (table.get() != NULL) {
string name = hop.getServiceName();
if (table->hasRoute(name)) {
insertRoute(*table->getRoute(name));
@@ -474,7 +474,7 @@ RoutingNode::executePolicySelect()
{
const PolicyDirective &dir = _routingContext->getDirective();
_policy = _mbus.getRoutingPolicy(_msg.getProtocol(), dir.getName(), dir.getParam());
- if (_policy.get() == nullptr) {
+ if (_policy.get() == NULL) {
setError(ErrorCode::UNKNOWN_POLICY, make_string(
"Protocol '%s' could not create routing policy '%s' with parameter '%s'.",
_msg.getProtocol().c_str(), dir.getName().c_str(), dir.getParam().c_str()));
@@ -489,7 +489,7 @@ RoutingNode::executePolicySelect()
return false;
}
if (_children.empty()) {
- if (_reply.get() == nullptr) {
+ if (_reply.get() == NULL) {
setError(ErrorCode::NO_SERVICES_FOR_ROUTE,
make_string("Policy '%s' selected no recipients for route '%s'.",
dir.getName().c_str(), _route.toString().c_str()));
@@ -522,7 +522,7 @@ RoutingNode::resolveChildren(uint32_t childDepth)
RoutingNode *child = *it;
child->_trace.trace(TraceLevel::SPLIT_MERGE,
make_string("Resolving '%s'.", child->_route.toString().c_str()));
- child->_isActive = (child->_reply.get() == nullptr);
+ child->_isActive = (child->_reply.get() == NULL);
if (child->_isActive) {
++numActiveChildren;
if (!child->resolve(childDepth)) {
@@ -562,7 +562,7 @@ RoutingNode::tryIgnoreResult()
if (!shouldIgnoreResult()) {
return false;
}
- if (_reply.get() == nullptr || !_reply->hasErrors()) {
+ if (_reply.get() == NULL || !_reply->hasErrors()) {
return false;
}
setReply(Reply::UP(new EmptyReply()));
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h
index 8951785c621..22ff07c26e5 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.h
@@ -228,7 +228,7 @@ public:
*/
RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender,
IReplyHandler &replyHandler, Message &msg,
- IDiscardHandler *discardHandler = nullptr);
+ IDiscardHandler *discardHandler = NULL);
/**
* Destructor. Frees up any allocated resources, namely all child nodes of
@@ -348,7 +348,6 @@ public:
* @return The message being routed.
*/
Message &getMessage() { return _msg; }
- const Message & getMessage() const { return _msg; }
/**
* Returns the trace object for this node. Each node has a separate trace
@@ -357,7 +356,6 @@ public:
* @return The trace object.
*/
Trace &getTrace() { return _trace; }
- const Trace &getTrace() const { return _trace; }
/**
* Returns the route object as it exists at this point of the tree.
@@ -371,7 +369,7 @@ public:
*
* @return True if this node has a reply.
*/
- bool hasReply() const { return _reply.get() != nullptr; }
+ bool hasReply() const { return _reply.get() != NULL; }
/**
* Returns the reply of this node.
@@ -421,7 +419,7 @@ public:
*
* @return True if an address is set.
*/
- bool hasServiceAddress() { return _serviceAddress.get() != nullptr; }
+ bool hasServiceAddress() { return _serviceAddress.get() != NULL; }
/**
* Returns the service address of this node. This is attached by the network
@@ -431,7 +429,6 @@ public:
* @return The recipient address.
*/
IServiceAddress &getServiceAddress() { return *_serviceAddress; }
- const IServiceAddress &getServiceAddress() const { return *_serviceAddress; }
/**
* Sets the service address of this node. This is called by the network
diff --git a/messagebus/src/vespa/messagebus/routing/routingtable.cpp b/messagebus/src/vespa/messagebus/routing/routingtable.cpp
index 58e1881dc90..7537605d9fa 100644
--- a/messagebus/src/vespa/messagebus/routing/routingtable.cpp
+++ b/messagebus/src/vespa/messagebus/routing/routingtable.cpp
@@ -54,7 +54,7 @@ const HopBlueprint *
RoutingTable::getHop(const string &name) const
{
std::map<string, HopBlueprint>::const_iterator it = _hops.find(name);
- return it != _hops.end() ? &(it->second) : nullptr;
+ return it != _hops.end() ? &(it->second) : NULL;
}
bool
@@ -67,7 +67,7 @@ const Route *
RoutingTable::getRoute(const string &name) const
{
std::map<string, Route>::const_iterator it = _routes.find(name);
- return it != _routes.end() ? &(it->second) : nullptr;
+ return it != _routes.end() ? &(it->second) : NULL;
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
index 103c21ee3aa..c093efd1106 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp
@@ -1,6 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "rpcmessagebus.h"
-#include <vespa/config/subscription/configuri.h>
namespace mbus {
@@ -16,10 +15,6 @@ RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams,
_subscriber.start();
}
-RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams, const RPCNetworkParams &rpcParams)
- : RPCMessageBus(mbusParams, rpcParams, config::ConfigUri("client"))
-{}
-
RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols,
const RPCNetworkParams &rpcParams,
const config::ConfigUri &routingCfgUri) :
diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.h b/messagebus/src/vespa/messagebus/rpcmessagebus.h
index e9cfed32c44..3f249965fec 100644
--- a/messagebus/src/vespa/messagebus/rpcmessagebus.h
+++ b/messagebus/src/vespa/messagebus/rpcmessagebus.h
@@ -7,8 +7,6 @@
#include <vespa/messagebus/network/rpcnetwork.h>
#include <vespa/config/helper/legacysubscriber.h>
-namespace config {class ConfigUri; }
-
namespace mbus {
/**
@@ -42,10 +40,8 @@ public:
* @param routingCfgId The config id for message bus routing specs.
*/
RPCMessageBus(const MessageBusParams &mbusParams,
- const RPCNetworkParams &rpcParams,
- const config::ConfigUri & routingCfgId);
- RPCMessageBus(const MessageBusParams &mbusParams,
- const RPCNetworkParams &rpcParams);
+ const RPCNetworkParams &rpcParams = RPCNetworkParams(),
+ const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
/**
@@ -59,8 +55,8 @@ public:
* @param routingCfgId The config id for messagebus routing specs.
*/
RPCMessageBus(const ProtocolSet &protocols,
- const RPCNetworkParams &rpcParams,
- const config::ConfigUri & routingCfgId);
+ const RPCNetworkParams &rpcParams = RPCNetworkParams(),
+ const config::ConfigUri & routingCfgId = config::ConfigUri("client"));
/**
* Destruct. This will destruct the internal MessageBus and RPCNetwork
diff --git a/messagebus/src/vespa/messagebus/sequencer.cpp b/messagebus/src/vespa/messagebus/sequencer.cpp
index 60fb3bdd39e..79fbd346c16 100644
--- a/messagebus/src/vespa/messagebus/sequencer.cpp
+++ b/messagebus/src/vespa/messagebus/sequencer.cpp
@@ -19,7 +19,7 @@ Sequencer::~Sequencer()
{
for (QueueMap::iterator it = _seqMap.begin(); it != _seqMap.end(); ++it) {
MessageQueue *queue = it->second;
- if (queue != nullptr) {
+ if (queue != NULL) {
while (queue->size() > 0) {
Message *msg = queue->front();
queue->pop();
@@ -40,7 +40,7 @@ Sequencer::filter(Message::UP msg)
vespalib::LockGuard guard(_lock);
QueueMap::iterator it = _seqMap.find(seqId);
if (it != _seqMap.end()) {
- if (it->second == nullptr) {
+ if (it->second == NULL) {
it->second = new MessageQueue();
}
msg->getTrace().trace(TraceLevel::COMPONENT,
@@ -49,7 +49,7 @@ Sequencer::filter(Message::UP msg)
msg.release();
return Message::UP();
}
- _seqMap[seqId] = nullptr; // insert empty queue
+ _seqMap[seqId] = NULL; // insert empty queue
}
return std::move(msg);
}
@@ -69,7 +69,7 @@ Sequencer::handleMessage(Message::UP msg)
{
if (msg->hasSequenceId()) {
msg = filter(std::move(msg));
- if (msg.get() != nullptr) {
+ if (msg.get() != NULL) {
sequencedSend(std::move(msg));
}
} else {
@@ -89,8 +89,8 @@ Sequencer::handleReply(Reply::UP reply)
QueueMap::iterator it = _seqMap.find(seq);
MessageQueue *que = it->second;
assert(it != _seqMap.end());
- if (que == nullptr || que->size() == 0) {
- if (que != nullptr) {
+ if (que == NULL || que->size() == 0) {
+ if (que != NULL) {
delete que;
}
_seqMap.erase(it);
@@ -99,7 +99,7 @@ Sequencer::handleReply(Reply::UP reply)
que->pop();
}
}
- if (msg.get() != nullptr) {
+ if (msg.get() != NULL) {
sequencedSend(std::move(msg));
}
IReplyHandler &handler = reply->getCallStack().pop(*reply);
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 1dbdd307e17..9a93a4aedf1 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -41,9 +41,9 @@ SourceSession::send(Message::UP msg, const string &routeName, bool parseIfNotFou
{
bool found = false;
RoutingTable::SP rt = _mbus.getRoutingTable(msg->getProtocol());
- if (rt.get() != nullptr) {
+ if (rt.get() != NULL) {
const Route *route = rt->getRoute(routeName);
- if (route != nullptr) {
+ if (route != NULL) {
msg->setRoute(*route);
found = true;
} else if (!parseIfNotFound) {
@@ -79,13 +79,13 @@ SourceSession::send(Message::UP msg)
if (_closed) {
return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg));
}
- if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) {
+ if (_throttlePolicy.get() != NULL && !_throttlePolicy->canSend(*msg, _pendingCount)) {
return Result(Error(ErrorCode::SEND_QUEUE_FULL,
make_string("Too much pending data (%d messages).", _pendingCount)),
std::move(msg));
}
msg->pushHandler(_replyHandler);
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy.get() != NULL) {
_throttlePolicy->processMessage(*msg);
}
++_pendingCount;
@@ -108,7 +108,7 @@ SourceSession::handleReply(Reply::UP reply)
vespalib::MonitorGuard guard(_monitor);
assert(_pendingCount > 0);
--_pendingCount;
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy.get() != NULL) {
_throttlePolicy->processReply(*reply);
}
done = (_closed && _pendingCount == 0);
diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
index 51fe91562ae..125b2a9822f 100644
--- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp
@@ -6,7 +6,7 @@
namespace mbus {
SourceSessionParams::SourceSessionParams() :
- _replyHandler(nullptr),
+ _replyHandler(NULL),
_throttlePolicy(new DynamicThrottlePolicy()),
_timeout(180.0)
{ }
@@ -40,7 +40,7 @@ SourceSessionParams::setTimeout(double timeout)
bool
SourceSessionParams::hasReplyHandler() const
{
- return _replyHandler != nullptr;
+ return _replyHandler != NULL;
}
IReplyHandler &
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
index dbc741f2dd4..e7f3646c72c 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp
@@ -4,7 +4,6 @@
#include "simpleprotocol.h"
#include "slobrok.h"
#include "slobrokstate.h"
-#include <vespa/messagebus/network/oosmanager.h>
#include <vespa/vespalib/component/vtag.h>
namespace mbus {
@@ -12,7 +11,9 @@ namespace mbus {
VersionedRPCNetwork::VersionedRPCNetwork(const RPCNetworkParams &params) :
RPCNetwork(params),
_version(vespalib::Vtag::currentVersion)
-{}
+{
+ // empty
+}
void
VersionedRPCNetwork::setVersion(const vespalib::Version &version)
@@ -96,4 +97,4 @@ TestServer::waitState(const OOSState &oosState)
return false;
}
-}
+} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h
index 400e2b274c5..1e2de3c4607 100644
--- a/messagebus/src/vespa/messagebus/testlib/testserver.h
+++ b/messagebus/src/vespa/messagebus/testlib/testserver.h
@@ -2,10 +2,9 @@
#pragma once
+#include <memory>
#include <vespa/messagebus/messagebus.h>
#include <vespa/messagebus/network/rpcnetwork.h>
-#include <vespa/messagebus/network/rpcnetworkparams.h>
-#include <vespa/fnet/frt/supervisor.h>
namespace mbus {
diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h
index f582a70a151..95de9e70f55 100644
--- a/messagebus/src/vespa/messagebus/tracenode.h
+++ b/messagebus/src/vespa/messagebus/tracenode.h
@@ -5,7 +5,7 @@
namespace mbus {
- using TraceNode = vespalib::TraceNode;
+ typedef vespalib::TraceNode TraceNode;
} // namespace mbus