diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-19 20:00:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-19 20:00:05 +0200 |
commit | 253cfe20e7450bc087506fd3d02a76140df5f9aa (patch) | |
tree | 711f9efc77af230526f7c266ad773243cbb7d602 /messagebus/src | |
parent | 5d92db079b6faf80cc2dcfb150889d452c3ac265 (diff) |
Revert "- Use C++11 for loops."
Diffstat (limited to 'messagebus/src')
53 files changed, 900 insertions, 1664 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index d42e396452a..096c0c0b485 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -2,28 +2,17 @@ package com.yahoo.messagebus.network.rpc; import com.yahoo.component.Version; +import com.yahoo.component.VersionSpecification; import com.yahoo.component.Vtag; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.jrt.Acceptor; -import com.yahoo.jrt.ListenFailedException; -import com.yahoo.jrt.Method; -import com.yahoo.jrt.MethodHandler; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.Spec; -import com.yahoo.jrt.StringValue; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Task; -import com.yahoo.jrt.Transport; +import com.yahoo.jrt.*; import com.yahoo.jrt.slobrok.api.IMirror; import com.yahoo.jrt.slobrok.api.Mirror; import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.*; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Protocol; -import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.network.Identity; import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.NetworkOwner; @@ -33,16 +22,8 @@ import com.yahoo.messagebus.routing.RoutingNode; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -63,7 +44,7 @@ public class RPCNetwork implements Network, MethodHandler { private final Acceptor listener; private final Mirror mirror; private final Register register; - private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap<>(); + private final Map<VersionSpecification, RPCSendAdapter> sendAdapters = new HashMap<>(); private NetworkOwner owner; private final SlobrokConfigSubscriber slobroksConfig; private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap<>(10000, 0.5f, true); @@ -181,10 +162,9 @@ public class RPCNetwork implements Network, MethodHandler { } this.owner = owner; - RPCSendAdapter adapter1 = new RPCSendV1(); - RPCSendAdapter adapter2 = new RPCSendV2(); - addSendAdapter(new Version(5), adapter1); - addSendAdapter(new Version(6,142), adapter2); + RPCSendAdapter adapter = new RPCSendV1(); + addSendAdapter(new VersionSpecification(5), adapter); + addSendAdapter(new VersionSpecification(6), adapter); } @Override @@ -254,9 +234,11 @@ public class RPCNetwork implements Network, MethodHandler { */ private void send(SendContext ctx) { if (destroyed.get()) { - replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown."); + replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, + "Network layer has performed shutdown."); } else if (ctx.hasError) { - replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version."); + replyError(ctx, ErrorCode.HANDSHAKE_FAILED, + "An error occured while resolving version."); } else { sendService.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); } @@ -333,7 +315,7 @@ public class RPCNetwork implements Network, MethodHandler { * @param version The version for which to register an adapter. * @param adapter The adapter to register. */ - private void addSendAdapter(Version version, RPCSendAdapter adapter) { + private void addSendAdapter(VersionSpecification version, RPCSendAdapter adapter) { adapter.attach(this); sendAdapters.put(version, adapter); } @@ -346,8 +328,12 @@ public class RPCNetwork implements Network, MethodHandler { * @return The compatible adapter. */ private RPCSendAdapter getSendAdapter(Version version) { - Map.Entry<Version, RPCSendAdapter> lower = sendAdapters.floorEntry(version); - return (lower != null) ? lower.getValue() : null; + for (Map.Entry<VersionSpecification, RPCSendAdapter> entry : sendAdapters.entrySet()) { + if (entry.getKey().matches(version)) { + return entry.getValue(); + } + } + return null; } /** diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java deleted file mode 100644 index d7b4887bd36..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSend.java +++ /dev/null @@ -1,269 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.component.Version; - -import com.yahoo.jrt.Method; -import com.yahoo.jrt.MethodHandler; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.Values; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Protocol; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Routable; -import com.yahoo.messagebus.Trace; -import com.yahoo.messagebus.TraceLevel; -import com.yahoo.messagebus.routing.Hop; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.routing.RoutingNode; -import com.yahoo.text.Utf8Array; - -/** - * Implements the request adapter for method "mbus.send1/mbus.slime". - * - * @author baldersheim - */ -public abstract class RPCSend implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter { - - private RPCNetwork net = null; - private String clientIdent = "client"; - private String serverIdent = "server"; - - protected abstract Method buildMethod(); - protected abstract String getReturnSpec(); - protected abstract Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg, - long timeRemaining, byte[] payload, int traceLevel); - protected abstract Reply createReply(Values ret, String serviceName, Trace trace); - protected abstract Params toParams(Values req); - protected abstract void createResponse(Values ret, Reply reply, Version version, byte [] payload); - @Override - public final void attach(RPCNetwork net) { - this.net = net; - String prefix = net.getIdentity().getServicePrefix(); - if (prefix != null && prefix.length() > 0) { - clientIdent = "'" + prefix + "'"; - serverIdent = clientIdent; - } - net.getSupervisor().addMethod(buildMethod()); - } - - @Override - public final void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) { - SendContext ctx = new SendContext(recipient, timeRemaining); - RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress(); - Message msg = recipient.getMessage(); - Route route = new Route(recipient.getRoute()); - Hop hop = route.removeHop(0); - - Request req = encodeRequest(version, route, address,msg, timeRemaining, payload, ctx.trace.getLevel()); - - if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { - ctx.trace.trace(TraceLevel.SEND_RECEIVE, - "Sending message (version " + version + ") from " + clientIdent + " to '" + - address.getServiceName() + "' with " + ctx.timeout + " seconds timeout."); - } - - if (hop.getIgnoreResult()) { - address.getTarget().getJRTTarget().invokeVoid(req); - if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { - ctx.trace.trace(TraceLevel.SEND_RECEIVE, - "Not waiting for a reply from '" + address.getServiceName() + "'."); - } - Reply reply = new EmptyReply(); - reply.getTrace().swap(ctx.trace); - net.getOwner().deliverReply(reply, recipient); - } else { - req.setContext(ctx); - address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this); - } - req.discardParameters(); // allow garbage collection of request parameters - } - - protected final Object decode(Utf8Array protocolName, Version version, byte [] payload) { - Protocol protocol = net.getOwner().getProtocol(protocolName); - if (protocol != null) { - Routable routable = protocol.decode(version, payload); - if (routable != null) { - if (routable instanceof Reply) { - return routable; - } else { - return new Error(ErrorCode.DECODE_ERROR, - "Payload decoded to a reply when expecting a message."); - } - } else { - return new Error(ErrorCode.DECODE_ERROR, - "Protocol '" + protocol.getName() + "' failed to decode routable."); - } - } else { - return new Error(ErrorCode.UNKNOWN_PROTOCOL, - "Protocol '" + protocolName + "' is not known by " + serverIdent + "."); - } - } - - @Override - public final void handleRequestDone(Request req) { - SendContext ctx = (SendContext)req.getContext(); - String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName(); - Reply reply = null; - Error error = null; - if (!req.checkReturnTypes(getReturnSpec())) { - // Map all known JRT errors to the appropriate message bus error. - reply = new EmptyReply(); - switch (req.errorCode()) { - case com.yahoo.jrt.ErrorCode.TIMEOUT: - error = new Error(ErrorCode.TIMEOUT, - "A timeout occured while waiting for '" + serviceName + "' (" + - ctx.timeout + " seconds expired); " + req.errorMessage()); - break; - case com.yahoo.jrt.ErrorCode.CONNECTION: - error = new Error(ErrorCode.CONNECTION_ERROR, - "A connection error occured for '" + serviceName + "'; " + req.errorMessage()); - break; - default: - error = new Error(ErrorCode.NETWORK_ERROR, - "A network error occured for '" + serviceName + "'; " + req.errorMessage()); - } - } else { - reply = createReply(req.returnValues(), serviceName, ctx.trace); - } - if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { - ctx.trace.trace(TraceLevel.SEND_RECEIVE, - "Reply (type " + reply.getType() + ") received at " + clientIdent + "."); - } - reply.getTrace().swap(ctx.trace); - if (error != null) { - reply.addError(error); - } - net.getOwner().deliverReply(reply, ctx.recipient); - } - - protected final class Params { - Version version; - String route; - String session; - boolean retryEnabled; - int retry; - long timeRemaining; - Utf8Array protocolName; - byte [] payload; - int traceLevel; - } - - @Override - public final void invoke(Request request) { - request.detach(); - Params p = toParams(request.parameters()); - - request.discardParameters(); // allow garbage collection of request parameters - - // Make sure that the owner understands the protocol. - Protocol protocol = net.getOwner().getProtocol(p.protocolName); - if (protocol == null) { - replyError(request, p.version, p.traceLevel, - new Error(ErrorCode.UNKNOWN_PROTOCOL, - "Protocol '" + p.protocolName + "' is not known by " + serverIdent + ".")); - return; - } - Routable routable = protocol.decode(p.version, p.payload); - if (routable == null) { - replyError(request, p.version, p.traceLevel, - new Error(ErrorCode.DECODE_ERROR, - "Protocol '" + protocol.getName() + "' failed to decode routable.")); - return; - } - if (routable instanceof Reply) { - replyError(request, p.version, p.traceLevel, - new Error(ErrorCode.DECODE_ERROR, - "Payload decoded to a reply when expecting a message.")); - return; - } - Message msg = (Message)routable; - if (p.route != null && p.route.length() > 0) { - msg.setRoute(net.getRoute(p.route)); - } - msg.setContext(new ReplyContext(request, p.version)); - msg.pushHandler(this); - msg.setRetryEnabled(p.retryEnabled); - msg.setRetry(p.retry); - msg.setTimeReceivedNow(); - msg.setTimeRemaining(p.timeRemaining); - msg.getTrace().setLevel(p.traceLevel); - if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) { - msg.getTrace().trace(TraceLevel.SEND_RECEIVE, - "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + p.session + "'."); - } - net.getOwner().deliverMessage(msg, p.session); - } - - @Override - public final void handleReply(Reply reply) { - ReplyContext ctx = (ReplyContext)reply.getContext(); - reply.setContext(null); - - // Add trace information. - if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) { - reply.getTrace().trace(TraceLevel.SEND_RECEIVE, - "Sending reply (version " + ctx.version + ") from " + serverIdent + "."); - } - - // Encode and return the reply through the RPC request. - byte[] payload = new byte[0]; - if (reply.getType() != 0) { - Protocol protocol = net.getOwner().getProtocol(reply.getProtocol()); - if (protocol != null) { - payload = protocol.encode(ctx.version, reply); - } - if (payload == null || payload.length == 0) { - reply.addError(new Error(ErrorCode.ENCODE_ERROR, - "An error occured while encoding the reply.")); - } - } - createResponse(ctx.request.returnValues(), reply, ctx.version, payload); - ctx.request.returnRequest(); - } - - /** - * Send an error reply for a given request. - * - * @param request The JRT request to reply to. - * @param version The version to serialize for. - * @param traceLevel The trace level to set in the reply. - * @param err The error to reply with. - */ - private void replyError(Request request, Version version, int traceLevel, Error err) { - Reply reply = new EmptyReply(); - reply.setContext(new ReplyContext(request, version)); - reply.getTrace().setLevel(traceLevel); - reply.addError(err); - handleReply(reply); - } - - private static class SendContext { - - final RoutingNode recipient; - final Trace trace; - final double timeout; - - SendContext(RoutingNode recipient, long timeRemaining) { - this.recipient = recipient; - trace = new Trace(recipient.getTrace().getLevel()); - timeout = timeRemaining * 0.001; - } - } - - private static class ReplyContext { - - final Request request; - final Version version; - - public ReplyContext(Request request, Version version) { - this.request = request; - this.version = version; - } - } -} diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java index 480a716e382..40cb7fb9ee9 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV1.java @@ -2,41 +2,40 @@ package com.yahoo.messagebus.network.rpc; import com.yahoo.component.Version; -import com.yahoo.jrt.DataValue; -import com.yahoo.jrt.DoubleValue; -import com.yahoo.jrt.Int32Array; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Int64Value; -import com.yahoo.jrt.Int8Value; -import com.yahoo.jrt.Method; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.*; import com.yahoo.jrt.StringValue; -import com.yahoo.jrt.Values; -import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.*; import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.Trace; -import com.yahoo.messagebus.TraceNode; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.routing.Hop; import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.routing.RoutingNode; import com.yahoo.text.Utf8Array; +import com.yahoo.text.Utf8String; /** * Implements the request adapter for method "mbus.send1". * * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> */ -public class RPCSendV1 extends RPCSend { +public class RPCSendV1 implements MethodHandler, ReplyHandler, RequestWaiter, RPCSendAdapter { private final String METHOD_NAME = "mbus.send1"; private final String METHOD_PARAMS = "sssbilsxi"; private final String METHOD_RETURN = "sdISSsxs"; + private RPCNetwork net = null; + private String clientIdent = "client"; + private String serverIdent = "server"; @Override - protected String getReturnSpec() { return METHOD_RETURN; } - @Override - protected Method buildMethod() { + public void attach(RPCNetwork net) { + this.net = net; + String prefix = net.getIdentity().getServicePrefix(); + if (prefix != null && prefix.length() > 0) { + clientIdent = "'" + prefix + "'"; + serverIdent = clientIdent; + } Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this); method.methodDesc("Send a message bus request and get a reply back."); @@ -57,80 +56,206 @@ public class RPCSendV1 extends RPCSend { .returnDesc(5, "protocol", "The name of the protocol that knows how to decode this reply.") .returnDesc(6, "payload", "The protocol specific reply payload.") .returnDesc(7, "trace", "A string representation of the trace."); - return method; + net.getSupervisor().addMethod(method); } + @Override - protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg, - long timeRemaining, byte[] payload, int traceLevel) { + public void send(RoutingNode recipient, Version version, byte[] payload, long timeRemaining) { + SendContext ctx = new SendContext(recipient, timeRemaining); + RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress(); + Message msg = recipient.getMessage(); + Route route = new Route(recipient.getRoute()); + Hop hop = route.removeHop(0); + Request req = new Request(METHOD_NAME); - Values v = req.parameters(); - v.add(new StringValue(version.toString())); - v.add(new StringValue(route.toString())); - v.add(new StringValue(address.getSessionName())); - v.add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0)); - v.add(new Int32Value(msg.getRetry())); - v.add(new Int64Value(timeRemaining)); - v.add(new StringValue(msg.getProtocol())); - v.add(new DataValue(payload)); - v.add(new Int32Value(traceLevel)); - return req; + req.parameters().add(new StringValue(version.toString())); + req.parameters().add(new StringValue(route.toString())); + req.parameters().add(new StringValue(address.getSessionName())); + req.parameters().add(new Int8Value(msg.getRetryEnabled() ? (byte)1 : (byte)0)); + req.parameters().add(new Int32Value(msg.getRetry())); + req.parameters().add(new Int64Value(timeRemaining)); + req.parameters().add(new StringValue(msg.getProtocol())); + req.parameters().add(new DataValue(payload)); + req.parameters().add(new Int32Value(ctx.trace.getLevel())); + + if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { + ctx.trace.trace(TraceLevel.SEND_RECEIVE, + "Sending message (version " + version + ") from " + clientIdent + " to '" + + address.getServiceName() + "' with " + ctx.timeout + " seconds timeout."); + } + + if (hop.getIgnoreResult()) { + address.getTarget().getJRTTarget().invokeVoid(req); + if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { + ctx.trace.trace(TraceLevel.SEND_RECEIVE, + "Not waiting for a reply from '" + address.getServiceName() + "'."); + } + Reply reply = new EmptyReply(); + reply.getTrace().swap(ctx.trace); + net.getOwner().deliverReply(reply, recipient); + } else { + req.setContext(ctx); + address.getTarget().getJRTTarget().invokeAsync(req, ctx.timeout, this); + } + req.discardParameters(); // allow garbage collection of request parameters } @Override - protected Reply createReply(Values ret, String serviceName, Trace trace) { - Version version = new Version(ret.get(0).asUtf8Array()); - double retryDelay = ret.get(1).asDouble(); - int[] errorCodes = ret.get(2).asInt32Array(); - String[] errorMessages = ret.get(3).asStringArray(); - String[] errorServices = ret.get(4).asStringArray(); - Utf8Array protocolName = ret.get(5).asUtf8Array(); - byte[] payload = ret.get(6).asData(); - String replyTrace = ret.get(7).asString(); - - // Make sure that the owner understands the protocol. + public void handleRequestDone(Request req) { + SendContext ctx = (SendContext)req.getContext(); + String serviceName = ((RPCServiceAddress)ctx.recipient.getServiceAddress()).getServiceName(); Reply reply = null; Error error = null; - if (payload.length > 0) { - Object retval = decode(protocolName, version, payload); - if (retval instanceof Reply) { - reply = (Reply) retval; - } else { - error = (Error) retval; + if (!req.checkReturnTypes(METHOD_RETURN)) { + // Map all known JRT errors to the appropriate message bus error. + reply = new EmptyReply(); + switch (req.errorCode()) { + case com.yahoo.jrt.ErrorCode.TIMEOUT: + error = new Error(com.yahoo.messagebus.ErrorCode.TIMEOUT, + "A timeout occured while waiting for '" + serviceName + "' (" + + ctx.timeout + " seconds expired); " + req.errorMessage()); + break; + case com.yahoo.jrt.ErrorCode.CONNECTION: + error = new Error(com.yahoo.messagebus.ErrorCode.CONNECTION_ERROR, + "A connection error occured for '" + serviceName + "'; " + req.errorMessage()); + break; + default: + error = new Error(com.yahoo.messagebus.ErrorCode.NETWORK_ERROR, + "A network error occured for '" + serviceName + "'; " + req.errorMessage()); + } + } else { + // Retrieve all reply components from JRT request object. + Version version = new Version(req.returnValues().get(0).asUtf8Array()); + double retryDelay = req.returnValues().get(1).asDouble(); + int[] errorCodes = req.returnValues().get(2).asInt32Array(); + String[] errorMessages = req.returnValues().get(3).asStringArray(); + String[] errorServices = req.returnValues().get(4).asStringArray(); + Utf8Array protocolName = req.returnValues().get(5).asUtf8Array(); + byte[] payload = req.returnValues().get(6).asData(); + String replyTrace = req.returnValues().get(7).asString(); + + // Make sure that the owner understands the protocol. + if (payload.length > 0) { + Protocol protocol = net.getOwner().getProtocol(protocolName); + if (protocol != null) { + Routable routable = protocol.decode(version, payload); + if (routable != null) { + if (routable instanceof Reply) { + reply = (Reply)routable; + } else { + error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR, + "Payload decoded to a reply when expecting a message."); + } + } else { + error = new Error(com.yahoo.messagebus.ErrorCode.DECODE_ERROR, + "Protocol '" + protocol.getName() + "' failed to decode routable."); + } + } else { + error = new Error(com.yahoo.messagebus.ErrorCode.UNKNOWN_PROTOCOL, + "Protocol '" + protocolName + "' is not known by " + serverIdent + "."); + } + } + if (reply == null) { + reply = new EmptyReply(); + } + reply.setRetryDelay(retryDelay); + for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) { + reply.addError(new Error(errorCodes[i], + errorMessages[i], + errorServices[i].length() > 0 ? errorServices[i] : serviceName)); + } + if (ctx.trace.getLevel() > 0) { + ctx.trace.getRoot().addChild(TraceNode.decode(replyTrace)); } } - if (reply == null) { - reply = new EmptyReply(); + if (ctx.trace.shouldTrace(TraceLevel.SEND_RECEIVE)) { + ctx.trace.trace(TraceLevel.SEND_RECEIVE, + "Reply (type " + reply.getType() + ") received at " + clientIdent + "."); } + reply.getTrace().swap(ctx.trace); if (error != null) { reply.addError(error); } - reply.setRetryDelay(retryDelay); - for (int i = 0; i < errorCodes.length && i < errorMessages.length; i++) { - reply.addError(new Error(errorCodes[i], errorMessages[i], - errorServices[i].length() > 0 ? errorServices[i] : serviceName)); - } - if (trace.getLevel() > 0) { - trace.getRoot().addChild(TraceNode.decode(replyTrace)); - } - return reply; + net.getOwner().deliverReply(reply, ctx.recipient); } - protected Params toParams(Values args) { - Params p = new Params(); - p.version = new Version(args.get(0).asUtf8Array()); - p.route = args.get(1).asString(); - p.session = args.get(2).asString(); - p.retryEnabled = (args.get(3).asInt8() != 0); - p.retry = args.get(4).asInt32(); - p.timeRemaining = args.get(5).asInt64(); - p.protocolName = args.get(6).asUtf8Array(); - p.payload = args.get(7).asData(); - p.traceLevel = args.get(8).asInt32(); - return p; + @Override + public void invoke(Request request) { + request.detach(); + Version version = new Version(request.parameters().get(0).asUtf8Array()); + String route = request.parameters().get(1).asString(); + String session = request.parameters().get(2).asString(); + boolean retryEnabled = (request.parameters().get(3).asInt8() != 0); + int retry = request.parameters().get(4).asInt32(); + long timeRemaining = request.parameters().get(5).asInt64(); + Utf8Array protocolName = request.parameters().get(6).asUtf8Array(); + byte[] payload = request.parameters().get(7).asData(); + int traceLevel = request.parameters().get(8).asInt32(); + + request.discardParameters(); // allow garbage collection of request parameters + + // Make sure that the owner understands the protocol. + Protocol protocol = net.getOwner().getProtocol(protocolName); + if (protocol == null) { + replyError(request, version, traceLevel, + new com.yahoo.messagebus.Error(ErrorCode.UNKNOWN_PROTOCOL, + "Protocol '" + protocolName + "' is not known by " + serverIdent + ".")); + return; + } + Routable routable = protocol.decode(version, payload); + if (routable == null) { + replyError(request, version, traceLevel, + new Error(ErrorCode.DECODE_ERROR, + "Protocol '" + protocol.getName() + "' failed to decode routable.")); + return; + } + if (routable instanceof Reply) { + replyError(request, version, traceLevel, + new Error(ErrorCode.DECODE_ERROR, + "Payload decoded to a reply when expecting a message.")); + return; + } + Message msg = (Message)routable; + if (route != null && route.length() > 0) { + msg.setRoute(net.getRoute(route)); + } + msg.setContext(new ReplyContext(request, version)); + msg.pushHandler(this); + msg.setRetryEnabled(retryEnabled); + msg.setRetry(retry); + msg.setTimeReceivedNow(); + msg.setTimeRemaining(timeRemaining); + msg.getTrace().setLevel(traceLevel); + if (msg.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) { + msg.getTrace().trace(TraceLevel.SEND_RECEIVE, + "Message (type " + msg.getType() + ") received at " + serverIdent + " for session '" + session + "'."); + } + net.getOwner().deliverMessage(msg, session); } @Override - protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) { + public void handleReply(Reply reply) { + ReplyContext ctx = (ReplyContext)reply.getContext(); + reply.setContext(null); + + // Add trace information. + if (reply.getTrace().shouldTrace(TraceLevel.SEND_RECEIVE)) { + reply.getTrace().trace(TraceLevel.SEND_RECEIVE, + "Sending reply (version " + ctx.version + ") from " + serverIdent + "."); + } + + // Encode and return the reply through the RPC request. + byte[] payload = new byte[0]; + if (reply.getType() != 0) { + Protocol protocol = net.getOwner().getProtocol(reply.getProtocol()); + if (protocol != null) { + payload = protocol.encode(ctx.version, reply); + } + if (payload == null || payload.length == 0) { + reply.addError(new Error(ErrorCode.ENCODE_ERROR, + "An error occured while encoding the reply.")); + } + } int[] eCodes = new int[reply.getNumErrors()]; String[] eMessages = new String[reply.getNumErrors()]; String[] eServices = new String[reply.getNumErrors()]; @@ -140,14 +265,57 @@ public class RPCSendV1 extends RPCSend { eMessages[i] = error.getMessage(); eServices[i] = error.getService() != null ? error.getService() : ""; } - ret.add(new StringValue(version.toString())); - ret.add(new DoubleValue(reply.getRetryDelay())); - ret.add(new Int32Array(eCodes)); - ret.add(new StringArray(eMessages)); - ret.add(new StringArray(eServices)); - ret.add(new StringValue(reply.getProtocol())); - ret.add(new DataValue(payload)); - ret.add(new StringValue(reply.getTrace().getRoot() != null ? reply.getTrace().getRoot().encode() : "")); + ctx.request.returnValues().add(new StringValue(ctx.version.toString())); + ctx.request.returnValues().add(new DoubleValue(reply.getRetryDelay())); + ctx.request.returnValues().add(new Int32Array(eCodes)); + ctx.request.returnValues().add(new StringArray(eMessages)); + ctx.request.returnValues().add(new StringArray(eServices)); + ctx.request.returnValues().add(new StringValue(reply.getProtocol())); + ctx.request.returnValues().add(new DataValue(payload)); + ctx.request.returnValues().add(new StringValue( + reply.getTrace().getRoot() != null ? + reply.getTrace().getRoot().encode() : + "")); + ctx.request.returnRequest(); } + /** + * Send an error reply for a given request. + * + * @param request The JRT request to reply to. + * @param version The version to serialize for. + * @param traceLevel The trace level to set in the reply. + * @param err The error to reply with. + */ + private void replyError(Request request, Version version, int traceLevel, Error err) { + Reply reply = new EmptyReply(); + reply.setContext(new ReplyContext(request, version)); + reply.getTrace().setLevel(traceLevel); + reply.addError(err); + handleReply(reply); + } + + private static class SendContext { + + final RoutingNode recipient; + final Trace trace; + final double timeout; + + SendContext(RoutingNode recipient, long timeRemaining) { + this.recipient = recipient; + trace = new Trace(recipient.getTrace().getLevel()); + timeout = timeRemaining * 0.001; + } + } + + private static class ReplyContext { + + final Request request; + final Version version; + + public ReplyContext(Request request, Version version) { + this.request = request; + this.version = version; + } + } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java deleted file mode 100644 index 8cc0b73ae30..00000000000 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCSendV2.java +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.messagebus.network.rpc; - -import com.yahoo.component.Version; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.jrt.DataValue; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Int8Value; -import com.yahoo.jrt.Method; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.Values; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.Error; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.Trace; -import com.yahoo.messagebus.TraceNode; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Inspector; -import com.yahoo.slime.Slime; -import com.yahoo.text.Utf8; -import com.yahoo.text.Utf8Array; - -/** - * Implements the request adapter for method "mbus.slime". - * - * @author baldersheim - */ -public class RPCSendV2 extends RPCSend { - - private final static String METHOD_NAME = "mbus.slime"; - private final static String METHOD_PARAMS = "bixbix"; - private final static String METHOD_RETURN = "bixbix"; - private final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 90, 1024); - - @Override - protected String getReturnSpec() { return METHOD_RETURN; } - @Override - protected Method buildMethod() { - - Method method = new Method(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, this); - method.methodDesc("Send a message bus request and get a reply back."); - method.paramDesc(0, "header_encoding", "Encoding type of header.") - .paramDesc(1, "header_decodedSize", "Number of bytes after header decoding.") - .paramDesc(2, "header_payload", "Slime encoded header payload.") - .paramDesc(3, "body_encoding", "Encoding type of body.") - .paramDesc(4, "body_decoded_ize", "Number of bytes after body decoding.") - .paramDesc(5, "body_payload", "Slime encoded body payload."); - method.returnDesc(0, "header_encoding", "Encoding type of header.") - .returnDesc(1, "header_decoded_size", "Number of bytes after header decoding.") - .returnDesc(2, "header_payload", "Slime encoded header payload.") - .returnDesc(3, "body_encoding", "Encoding type of body.") - .returnDesc(4, "body_encoded_size", "Number of bytes after body decoding.") - .returnDesc(5, "body_payload", "Slime encoded body payload."); - return method; - } - private static final String VERSION_F = new String("version"); - private static final String ROUTE_F = new String("route"); - private static final String SESSION_F = new String("session"); - private static final String PROTOCOL_F = new String("prot"); - private static final String TRACELEVEL_F = new String("tracelevel"); - private static final String TRACE_F = new String("trace"); - private static final String USERETRY_F = new String("useretry"); - private static final String RETRY_F = new String("retry"); - private static final String RETRYDELAY_F = new String("retrydelay"); - private static final String TIMEREMAINING_F = new String("timeleft"); - private static final String ERRORS_F = new String("errors"); - private static final String SERVICE_F = new String("service"); - private static final String CODE_F = new String("code"); - private static final String BLOB_F = new String("msg"); - private static final String MSG_F = new String("msg"); - - @Override - protected Request encodeRequest(Version version, Route route, RPCServiceAddress address, Message msg, - long timeRemaining, byte[] payload, int traceLevel) - { - - Request req = new Request(METHOD_NAME); - Values v = req.parameters(); - - v.add(new Int8Value(CompressionType.NONE.getCode())); - v.add(new Int32Value(0)); - v.add(new DataValue(new byte[0])); - - Slime slime = new Slime(); - Cursor root = slime.setObject(); - - root.setString(VERSION_F, version.toString()); - root.setString(ROUTE_F, route.toString()); - root.setString(SESSION_F, address.getSessionName()); - root.setString(PROTOCOL_F, msg.getProtocol().toString()); - root.setBool(USERETRY_F, msg.getRetryEnabled()); - root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMEREMAINING_F, msg.getTimeRemaining()); - root.setLong(TRACELEVEL_F, traceLevel); - root.setData(BLOB_F, payload); - - byte[] serializedSlime = BinaryFormat.encode(slime); - Compressor.Compression compressionResult = compressor.compress(serializedSlime); - - v.add(new Int8Value(compressionResult.type().getCode())); - v.add(new Int32Value(compressionResult.uncompressedSize())); - v.add(new DataValue(compressionResult.data())); - - return req; - } - - @Override - protected Reply createReply(Values ret, String serviceName, Trace trace) { - CompressionType compression = CompressionType.valueOf(ret.get(3).asInt8()); - byte[] slimeBytes = compressor.decompress(ret.get(5).asData(), compression, ret.get(4).asInt32()); - Slime slime = BinaryFormat.decode(slimeBytes); - Inspector root = slime.get(); - - Version version = new Version(root.field(VERSION_F).asString()); - byte[] payload = root.field(BLOB_F).asData(); - - // Make sure that the owner understands the protocol. - Reply reply = null; - Error error = null; - if (payload.length > 0) { - Object retval = decode(new Utf8Array(root.field(PROTOCOL_F).asUtf8()), version, payload); - if (retval instanceof Reply) { - reply = (Reply) retval; - } else { - error = (Error) retval; - } - } - if (reply == null) { - reply = new EmptyReply(); - } - if (error != null) { - reply.addError(error); - } - reply.setRetryDelay(root.field(RETRYDELAY_F).asDouble()); - - Inspector errors = root.field(ERRORS_F); - for (int i = 0; i < errors.entries(); i++) { - Inspector e = errors.entry(i); - String service = e.field(SERVICE_F).asString(); - reply.addError(new Error((int)e.field(CODE_F).asLong(), e.field(MSG_F).asString(), - (service != null && service.length() > 0) ? service : serviceName)); - } - if (trace.getLevel() > 0) { - trace.getRoot().addChild(TraceNode.decode(root.field(TRACE_F).asString())); - } - return reply; - } - - protected Params toParams(Values args) { - CompressionType compression = CompressionType.valueOf(args.get(3).asInt8()); - byte[] slimeBytes = compressor.decompress(args.get(5).asData(), compression, args.get(4).asInt32()); - Slime slime = BinaryFormat.decode(slimeBytes); - Inspector root = slime.get(); - Params p = new Params(); - p.version = new Version(root.field(VERSION_F).asString()); - p.route = root.field(ROUTE_F).asString(); - p.session = root.field(SESSION_F).asString(); - p.retryEnabled = root.field(USERETRY_F).asBool(); - p.retry = (int)root.field(RETRY_F).asLong(); - p.timeRemaining = root.field(TIMEREMAINING_F).asLong(); - p.protocolName = new Utf8Array(Utf8.toBytes(root.field(PROTOCOL_F).asString())); - p.payload = root.field(BLOB_F).asData(); - p.traceLevel = (int)root.field(TRACELEVEL_F).asLong(); - return p; - } - - @Override - protected void createResponse(Values ret, Reply reply, Version version, byte [] payload) { - ret.add(new Int8Value(CompressionType.NONE.getCode())); - ret.add(new Int32Value(0)); - ret.add(new DataValue(new byte[0])); - - Slime slime = new Slime(); - Cursor root = slime.setObject(); - - root.setString(VERSION_F, version.toString()); - root.setDouble(RETRYDELAY_F, reply.getRetryDelay()); - root.setString(PROTOCOL_F, reply.getProtocol().toString()); - root.setData(BLOB_F, payload); - if (reply.getTrace().getLevel() > 0) { - root.setString(TRACE_F, reply.getTrace().getRoot().encode()); - } - - if (reply.getNumErrors() > 0) { - Cursor array = root.setArray(ERRORS_F); - for (int i = 0; i < reply.getNumErrors(); i++) { - Cursor e = array.addObject(); - Error mbusE = reply.getError(i); - e.setLong(CODE_F, mbusE.getCode()); - e.setString(MSG_F, mbusE.getMessage()); - if (mbusE.getService() != null) { - e.setString(SERVICE_F, mbusE.getService()); - } - } - } - - byte[] serializedSlime = BinaryFormat.encode(slime); - Compressor.Compression compressionResult = compressor.compress(serializedSlime); - - ret.add(new Int8Value(compressionResult.type().getCode())); - ret.add(new Int32Value(compressionResult.uncompressedSize())); - ret.add(new DataValue(compressionResult.data())); - } - -} diff --git a/messagebus/src/tests/advancedrouting/advancedrouting.cpp b/messagebus/src/tests/advancedrouting/advancedrouting.cpp index 8d6b8cd3875..e847fdf3222 100644 --- a/messagebus/src/tests/advancedrouting/advancedrouting.cpp +++ b/messagebus/src/tests/advancedrouting/advancedrouting.cpp @@ -1,15 +1,17 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routing/errordirective.h> +#include <vespa/messagebus/routing/retrytransienterrorspolicy.h> #include <vespa/messagebus/testlib/custompolicy.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/errorcode.h> -#include <vespa/messagebus/routing/retrytransienterrorspolicy.h> using namespace mbus; @@ -115,7 +117,7 @@ Test::Main() void Test::testAdvanced(TestData &data) { - const double TIMEOUT = 60; + const double TIMEOUT=60; IProtocol::SP protocol(new SimpleProtocol()); SimpleProtocol &simple = static_cast<SimpleProtocol&>(*protocol); simple.addPolicyFactory("Custom", SimpleProtocol::IPolicyFactory::SP(new CustomPolicyFactory(false, ErrorCode::NO_ADDRESS_FOR_SERVICE))); diff --git a/messagebus/src/tests/bucketsequence/bucketsequence.cpp b/messagebus/src/tests/bucketsequence/bucketsequence.cpp index d98acc4f191..4073469c253 100644 --- a/messagebus/src/tests/bucketsequence/bucketsequence.cpp +++ b/messagebus/src/tests/bucketsequence/bucketsequence.cpp @@ -5,6 +5,7 @@ #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/testlib/simplereply.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> #include <vespa/vespalib/testkit/testapp.h> diff --git a/messagebus/src/tests/choke/choke.cpp b/messagebus/src/tests/choke/choke.cpp index 91f91c84f45..b8e20eb9074 100644 --- a/messagebus/src/tests/choke/choke.cpp +++ b/messagebus/src/tests/choke/choke.cpp @@ -2,6 +2,7 @@ #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/reply.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> diff --git a/messagebus/src/tests/error/error.cpp b/messagebus/src/tests/error/error.cpp index 1d8a489a5ed..dbaa869507b 100644 --- a/messagebus/src/tests/error/error.cpp +++ b/messagebus/src/tests/error/error.cpp @@ -65,14 +65,14 @@ Test::Main() reply = pxy.getReply(); ASSERT_TRUE(reply.get() != 0); - ASSERT_EQUAL(reply->getNumErrors(), 1u); + EXPECT_EQUAL(reply->getNumErrors(), 1u); EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session"); reply->addError(Error(ErrorCode::APP_FATAL_ERROR, "fatality")); is->forward(std::move(reply)); reply = src.getReply(); ASSERT_TRUE(reply.get() != 0); - ASSERT_EQUAL(reply->getNumErrors(), 2u); + EXPECT_EQUAL(reply->getNumErrors(), 2u); EXPECT_EQUAL(reply->getError(0).getService(), "test/dst/session"); EXPECT_EQUAL(reply->getError(1).getService(), "test/pxy/session"); } diff --git a/messagebus/src/tests/routing/routing.cpp b/messagebus/src/tests/routing/routing.cpp index 515cbd99fde..e907e0d1163 100644 --- a/messagebus/src/tests/routing/routing.cpp +++ b/messagebus/src/tests/routing/routing.cpp @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/routing/errordirective.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> #include <vespa/messagebus/routing/routingcontext.h> diff --git a/messagebus/src/tests/sendadapter/sendadapter.cpp b/messagebus/src/tests/sendadapter/sendadapter.cpp index e3fa3278300..8ae1d0993cb 100644 --- a/messagebus/src/tests/sendadapter/sendadapter.cpp +++ b/messagebus/src/tests/sendadapter/sendadapter.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> #include <vespa/messagebus/testlib/simpleprotocol.h> diff --git a/messagebus/src/tests/serviceaddress/serviceaddress.cpp b/messagebus/src/tests/serviceaddress/serviceaddress.cpp index 89edb981716..15a2e9fdac8 100644 --- a/messagebus/src/tests/serviceaddress/serviceaddress.cpp +++ b/messagebus/src/tests/serviceaddress/serviceaddress.cpp @@ -3,7 +3,20 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/messagebus/testlib/testserver.h> +#include <vespa/messagebus/testlib/receptor.h> +#include <vespa/messagebus/testlib/simplemessage.h> +#include <vespa/messagebus/testlib/simplereply.h> +#include <vespa/messagebus/testlib/simpleprotocol.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/sourcesession.h> +#include <vespa/messagebus/intermediatesession.h> +#include <vespa/messagebus/destinationsession.h> +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/error.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/routing/routingspec.h> #include <vespa/messagebus/network/rpcservice.h> +#include <vespa/messagebus/sourcesessionparams.h> using namespace mbus; diff --git a/messagebus/src/tests/servicepool/servicepool.cpp b/messagebus/src/tests/servicepool/servicepool.cpp index 86a800b600a..c98b342b2f3 100644 --- a/messagebus/src/tests/servicepool/servicepool.cpp +++ b/messagebus/src/tests/servicepool/servicepool.cpp @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/messagebus/network/rpcnetwork.h> -#include <vespa/messagebus/network/rpcnetworkparams.h> -#include <vespa/messagebus/network/rpcservicepool.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/vespalib/testkit/testapp.h> diff --git a/messagebus/src/tests/shutdown/shutdown.cpp b/messagebus/src/tests/shutdown/shutdown.cpp index 070b51bbbc2..b5e6cac970e 100644 --- a/messagebus/src/tests/shutdown/shutdown.cpp +++ b/messagebus/src/tests/shutdown/shutdown.cpp @@ -1,6 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/errorcode.h> +#include <vespa/messagebus/messagebus.h> +#include <vespa/messagebus/routing/errordirective.h> #include <vespa/messagebus/routing/retrytransienterrorspolicy.h> #include <vespa/messagebus/testlib/receptor.h> #include <vespa/messagebus/testlib/simplemessage.h> diff --git a/messagebus/src/tests/slobrok/slobrok.cpp b/messagebus/src/tests/slobrok/slobrok.cpp index 6c389c25e70..360705e7eae 100644 --- a/messagebus/src/tests/slobrok/slobrok.cpp +++ b/messagebus/src/tests/slobrok/slobrok.cpp @@ -2,10 +2,10 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/messagebus/testlib/slobrok.h> +#include <string> +#include <sstream> #include <vespa/slobrok/sbmirror.h> #include <vespa/messagebus/network/rpcnetwork.h> -#include <vespa/messagebus/network/rpcnetworkparams.h> - #include <vespa/vespalib/util/host_name.h> using slobrok::api::IMirrorAPI; diff --git a/messagebus/src/vespa/messagebus/blob.h b/messagebus/src/vespa/messagebus/blob.h index 0509d71d77c..1e84222064a 100644 --- a/messagebus/src/vespa/messagebus/blob.h +++ b/messagebus/src/vespa/messagebus/blob.h @@ -26,13 +26,13 @@ public: _payload(Alloc::alloc(s)), _sz(s) { } - Blob(Blob && rhs) noexcept : + Blob(Blob && rhs) : _payload(std::move(rhs._payload)), _sz(rhs._sz) { rhs._sz = 0; } - Blob & operator = (Blob && rhs) noexcept { + Blob & operator = (Blob && rhs) { swap(rhs); return *this; } @@ -65,3 +65,4 @@ private: }; } // namespace mbus + diff --git a/messagebus/src/vespa/messagebus/callstack.cpp b/messagebus/src/vespa/messagebus/callstack.cpp index b7179e14cad..0ab8658d53f 100644 --- a/messagebus/src/vespa/messagebus/callstack.cpp +++ b/messagebus/src/vespa/messagebus/callstack.cpp @@ -13,7 +13,7 @@ CallStack::discard() { while (!_stack.empty()) { const Frame &frame = _stack.back(); - if (frame.discardHandler != nullptr) { + if (frame.discardHandler != NULL) { frame.discardHandler->handleDiscard(frame.ctx); } _stack.pop_back(); diff --git a/messagebus/src/vespa/messagebus/callstack.h b/messagebus/src/vespa/messagebus/callstack.h index 0f9d8c93b29..68da598e796 100644 --- a/messagebus/src/vespa/messagebus/callstack.h +++ b/messagebus/src/vespa/messagebus/callstack.h @@ -72,7 +72,7 @@ public: * @param ctx The context to store. * @param discardHandler The handler for discarded messages. **/ - void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = nullptr) { + void push(IReplyHandler &replyHandler, Context ctx, IDiscardHandler *discardHandler = NULL) { _stack.emplace_back(&replyHandler, discardHandler, ctx); } diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp index ecbc036ffed..3959666e718 100644 --- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp +++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp @@ -6,7 +6,7 @@ namespace mbus { DestinationSessionParams::DestinationSessionParams() : _name("destination"), _broadcastName(true), - _handler(nullptr) + _handler(NULL) { } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index 5a8f510ddcf..4cd19b62419 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -97,7 +97,7 @@ MessageBus::MessageBus(INetwork &net, ProtocolSet protocols) : MessageBusParams params; while (!protocols.empty()) { IProtocol::SP protocol = protocols.extract(); - if (protocol.get() != nullptr) { + if (protocol.get() != NULL) { params.addProtocol(protocol); } } @@ -155,7 +155,7 @@ MessageBus::setup(const MessageBusParams ¶ms) // Start messenger. IRetryPolicy::SP retryPolicy = params.getRetryPolicy(); - if (retryPolicy.get() != nullptr) { + if (retryPolicy.get() != NULL) { _resender.reset(new Resender(retryPolicy)); Messenger::ITask::UP task(new ResenderTask(*_resender)); @@ -271,7 +271,7 @@ MessageBus::sync() void MessageBus::handleMessage(Message::UP msg) { - if (_resender.get() != nullptr && msg->hasBucketSequence()) { + if (_resender.get() != NULL && msg->hasBucketSequence()) { deliverError(std::move(msg), ErrorCode::SEQUENCE_ERROR, "Bucket sequences not supported when resender is enabled."); return; @@ -359,7 +359,7 @@ MessageBus::handleDiscard(Context ctx) void MessageBus::deliverMessage(Message::UP msg, const string &session) { - IMessageHandler *msgHandler = nullptr; + IMessageHandler *msgHandler = NULL; { LockGuard guard(_lock); std::map<string, IMessageHandler*>::iterator it = _sessions.find(session); @@ -367,7 +367,7 @@ MessageBus::deliverMessage(Message::UP msg, const string &session) msgHandler = it->second; } } - if (msgHandler == nullptr) { + if (msgHandler == NULL) { deliverError(std::move(msg), ErrorCode::UNKNOWN_SESSION, make_string("Session '%s' does not exist.", session.c_str())); } else if (!checkPending(*msg)) { diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 4b612b66c31..2d1204ee7b6 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -32,7 +32,7 @@ public: } ~MessageTask() { - if (_msg.get() != nullptr) { + if (_msg.get() != NULL) { _msg->discard(); } } @@ -42,7 +42,7 @@ public: } uint8_t priority() const override { - if (_msg.get() != nullptr) { + if (_msg.get() != NULL) { return _msg->priority(); } @@ -64,7 +64,7 @@ public: } ~ReplyTask() { - if (_reply.get() != nullptr) { + if (_reply.get() != NULL) { _reply->discard(); } } @@ -74,7 +74,7 @@ public: } uint8_t priority() const override { - if (_reply.get() != nullptr) { + if (_reply.get() != NULL) { return _reply->priority(); } @@ -205,7 +205,7 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg) _queue.pop(); } } - if (task.get() != nullptr) { + if (task.get() != NULL) { try { task->run(); } catch (const std::exception &e) { diff --git a/messagebus/src/vespa/messagebus/network/CMakeLists.txt b/messagebus/src/vespa/messagebus/network/CMakeLists.txt index 750ff20240f..e8992034622 100644 --- a/messagebus/src/vespa/messagebus/network/CMakeLists.txt +++ b/messagebus/src/vespa/messagebus/network/CMakeLists.txt @@ -6,9 +6,7 @@ vespa_add_library(messagebus_network OBJECT oosmanager.cpp rpcnetwork.cpp rpcnetworkparams.cpp - rpcsend.cpp rpcsendv1.cpp - rpcsendv2.cpp rpcservice.cpp rpcserviceaddress.cpp rpcservicepool.cpp diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.cpp b/messagebus/src/vespa/messagebus/network/oosmanager.cpp index 250df147675..eecfe1da447 100644 --- a/messagebus/src/vespa/messagebus/network/oosmanager.cpp +++ b/messagebus/src/vespa/messagebus/network/oosmanager.cpp @@ -89,7 +89,7 @@ OOSManager::isOOS(const string &service) return false; } vespalib::LockGuard guard(_lock); - if (_oosSet.get() == nullptr) { + if (_oosSet.get() == NULL) { return false; } if (_oosSet->find(service) == _oosSet->end()) { diff --git a/messagebus/src/vespa/messagebus/network/oosmanager.h b/messagebus/src/vespa/messagebus/network/oosmanager.h index eac00b93896..b521520d2b3 100644 --- a/messagebus/src/vespa/messagebus/network/oosmanager.h +++ b/messagebus/src/vespa/messagebus/network/oosmanager.h @@ -20,11 +20,14 @@ class RPCNetwork; */ class OOSManager : public FNET_Task { public: - using IMirrorAPI = slobrok::api::IMirrorAPI; - using SpecList = IMirrorAPI::SpecList; - using ClientList = std::vector<OOSClient::SP>; - using StringSet = std::set<string>; - using OOSSet = std::shared_ptr<StringSet>; + /** + * Convenience typedefs. + */ + typedef slobrok::api::IMirrorAPI IMirrorAPI; + typedef IMirrorAPI::SpecList SpecList; + typedef std::vector<OOSClient::SP> ClientList; + typedef std::set<string> StringSet; + typedef std::shared_ptr<StringSet> OOSSet; private: FRT_Supervisor &_orb; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 4b4b23a75db..75c5fc3f6c5 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -1,11 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "inetworkowner.h" #include "rpcnetwork.h" -#include "rpcservicepool.h" -#include "oosmanager.h" -#include "rpcsendv1.h" -#include "rpcsendv2.h" -#include "rpctargetpool.h" -#include "rpcnetworkparams.h" #include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/iprotocol.h> #include <vespa/messagebus/tracelevel.h> @@ -16,8 +11,6 @@ #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/scheduler.h> -#include <vespa/fnet/transport.h> -#include <vespa/fnet/frt/supervisor.h> #include <vespa/log/log.h> LOG_SETUP(".rpcnetwork"); @@ -59,15 +52,17 @@ public: namespace mbus { RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, - const std::vector<RoutingNode*> &recipients) - : _net(net), - _msg(msg), - _traceLevel(msg.getTrace().getLevel()), - _recipients(recipients), - _hasError(false), - _pending(_recipients.size()), - _version(_net.getVersion()) -{ } + const std::vector<RoutingNode*> &recipients) : + _net(net), + _msg(msg), + _traceLevel(msg.getTrace().getLevel()), + _recipients(recipients), + _hasError(false), + _pending(_recipients.size()), + _version(_net.getVersion()) +{ + // empty +} void RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) @@ -75,7 +70,7 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) bool shouldSend = false; { vespalib::LockGuard guard(_lock); - if (version == nullptr) { + if (version == NULL) { _hasError = true; } else if (*version < _version) { _version = *version; @@ -90,9 +85,11 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } -RPCNetwork::TargetPoolTask::TargetPoolTask(FNET_Scheduler &scheduler, RPCTargetPool &pool) - : FNET_Task(&scheduler), - _pool(pool) +RPCNetwork::TargetPoolTask::TargetPoolTask( + FNET_Scheduler &scheduler, + RPCTargetPool &pool) : + FNET_Task(&scheduler), + _pool(pool) { ScheduleNow(); } @@ -107,26 +104,24 @@ RPCNetwork::TargetPoolTask::PerformTask() RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(0), _ident(params.getIdentity()), - _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get(), nullptr)), - _scheduler(*_transport->GetScheduler()), - _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), - _targetPoolTask(_scheduler, *_targetPool), - _servicePool(std::make_unique<RPCServicePool>(*this, 4096)), - _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), - _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), - _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), - _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())), + _threadPool(128000, 0), + _transport(), + _orb(&_transport, NULL), + _scheduler(*_transport.GetScheduler()), + _targetPool(params.getConnectionExpireSecs()), + _targetPoolTask(_scheduler, _targetPool), + _servicePool(*this, 4096), + _slobrokCfgFactory(params.getSlobrokConfig()), + _mirror(std::make_unique<slobrok::api::MirrorAPI>(_orb, _slobrokCfgFactory)), + _regAPI(std::make_unique<slobrok::api::RegisterAPI>(_orb, _slobrokCfgFactory)), + _oosManager(_orb, *_mirror, params.getOOSServerPattern()), _requestedPort(params.getListenPort()), - _sendV1(std::make_unique<RPCSendV1>()), - _sendV2(std::make_unique<RPCSendV2>()), - _sendAdapters(), - _compressionConfig(params.getCompressionConfig()) + _sendV1(), + _sendAdapters() { - _transport->SetDirectWrite(false); - _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); - _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); + _transport.SetDirectWrite(false); + _transport.SetMaxInputBufferSize(params.getMaxInputBufferSize()); + _transport.SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); } RPCNetwork::~RPCNetwork() @@ -137,33 +132,33 @@ RPCNetwork::~RPCNetwork() FRT_RPCRequest * RPCNetwork::allocRequest() { - return _orb->AllocRPCRequest(); + return _orb.AllocRPCRequest(); } RPCTarget::SP RPCNetwork::getTarget(const RPCServiceAddress &address) { - return _targetPool->getTarget(*_orb, address); + return _targetPool.getTarget(_orb, address); } void -RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg) +RPCNetwork::replyError(const SendContext &ctx, uint32_t errCode, + const string &errMsg) { - for (RoutingNode * rnode : ctx._recipients) { + for (std::vector<RoutingNode*>::const_iterator it = ctx._recipients.begin(); + it != ctx._recipients.end(); ++it) + { Reply::UP reply(new EmptyReply()); reply->setTrace(Trace(ctx._traceLevel)); reply->addError(Error(errCode, errMsg)); - _owner->deliverReply(std::move(reply), *rnode); + _owner->deliverReply(std::move(reply), **it); } } -int RPCNetwork::getPort() const { return _orb->GetListenPort(); } - - void RPCNetwork::flushTargetPool() { - _targetPool->flushTargets(true); + _targetPool.flushTargets(true); } const vespalib::Version & @@ -178,13 +173,13 @@ RPCNetwork::attach(INetworkOwner &owner) LOG_ASSERT(_owner == 0); _owner = &owner; - _sendV1->attach(*this); - _sendV2->attach(*this); - _sendAdapters[vespalib::Version(5)] = _sendV1.get(); - _sendAdapters[vespalib::Version(6, 142)] = _sendV2.get(); + _sendV1.attach(*this); + _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(5), &_sendV1)); + _sendAdapters.insert(SendAdapterMap::value_type(vespalib::VersionSpecification(6), &_sendV1)); - FRT_ReflectionBuilder builder(_orb.get()); - builder.DefineMethod("mbus.getVersion", "", "s", true, FRT_METHOD(RPCNetwork::invoke), this); + FRT_ReflectionBuilder builder(&_orb); + builder.DefineMethod("mbus.getVersion", "", "s", true, + FRT_METHOD(RPCNetwork::invoke), this); builder.MethodDesc("Retrieves the message bus version."); builder.ReturnDesc("version", "The message bus version."); } @@ -198,23 +193,29 @@ RPCNetwork::invoke(FRT_RPCRequest *req) const string RPCNetwork::getConnectionSpec() const { - return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb->GetListenPort()); + return make_string("tcp/%s:%d", _ident.getHostname().c_str(), _orb.GetListenPort()); } RPCSendAdapter * RPCNetwork::getSendAdapter(const vespalib::Version &version) { - auto lower = _sendAdapters.lower_bound(version); - return (lower != _sendAdapters.end()) ? lower->second : nullptr; + for (SendAdapterMap::iterator it = _sendAdapters.begin(); + it != _sendAdapters.end(); ++it) + { + if (it->first.matches(version)) { + return it->second; + } + } + return NULL; } bool RPCNetwork::start() { - if (!_orb->Listen(_requestedPort)) { + if (!_orb.Listen(_requestedPort)) { return false; } - if (!_transport->Start(_threadPool.get())) { + if (!_transport.Start(&_threadPool)) { return false; } return true; @@ -226,13 +227,13 @@ bool RPCNetwork::waitUntilReady(double seconds) const { slobrok::api::SlobrokList brokerList; - slobrok::Configurator::UP configurator = _slobrokCfgFactory->create(brokerList); + slobrok::Configurator::UP configurator = _slobrokCfgFactory.create(brokerList); bool hasConfig = false; for (uint32_t i = 0; i < seconds * 100; ++i) { if (configurator->poll()) { hasConfig = true; } - if (_mirror->ready() && _oosManager->isReady()) { + if (_mirror->ready() && _oosManager.isReady()) { return true; } FastOS_Thread::Sleep(10); @@ -243,7 +244,7 @@ RPCNetwork::waitUntilReady(double seconds) const std::string brokers = brokerList.logString(); LOG(error, "mirror (of %s) failed to become ready in %d seconds", brokers.c_str(), (int)seconds); - } else if (! _oosManager->isReady()) { + } else if (! _oosManager.isReady()) { LOG(error, "OOS manager failed to become ready in %d seconds", (int)seconds); } return false; @@ -292,23 +293,24 @@ RPCNetwork::allocServiceAddress(RoutingNode &recipient) Error RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceName) { - if (_oosManager->isOOS(serviceName)) { + if (_oosManager.isOOS(serviceName)) { return Error(ErrorCode::SERVICE_OOS, make_string("The service '%s' has been marked as out of service.", serviceName.c_str())); } - RPCServiceAddress::UP ret = _servicePool->resolve(serviceName); - if (ret.get() == nullptr) { + RPCServiceAddress::UP ret = _servicePool.resolve(serviceName); + if (ret.get() == NULL) { return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE, make_string("The address of service '%s' could not be resolved. It is not currently " - "registered with the Vespa name server. " - "The service must be having problems, or the routing configuration is wrong.", - serviceName.c_str())); + "registered with the Vespa name server. " + "The service must be having problems, or the routing configuration is wrong.", + serviceName.c_str())); } - RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret); - if (target.get() == nullptr) { + RPCTarget::SP target = _targetPool.getTarget(_orb, *ret); + if (target.get() == NULL) { return Error(ErrorCode::CONNECTION_ERROR, - make_string("Failed to connect to service '%s'.", serviceName.c_str())); + make_string("Failed to connect to service '%s'.", + serviceName.c_str())); } ret->setTarget(target); // free by freeServiceAddress() recipient.setServiceAddress(IServiceAddress::UP(ret.release())); @@ -328,7 +330,7 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients double timeout = ctx._msg.getTimeRemainingNow() / 1000.0; for (uint32_t i = 0, len = ctx._recipients.size(); i < len; ++i) { RoutingNode *&recipient = ctx._recipients[i]; - LOG_ASSERT(recipient != nullptr); + LOG_ASSERT(recipient != NULL); RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient->getServiceAddress()); LOG_ASSERT(address.hasTarget()); @@ -341,12 +343,13 @@ void RPCNetwork::send(RPCNetwork::SendContext &ctx) { if (ctx._hasError) { - replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version."); + replyError(ctx, ErrorCode::HANDSHAKE_FAILED, + "An error occured while resolving version."); } else { uint64_t timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); RPCSendAdapter *adapter = getSendAdapter(ctx._version); - if (adapter == nullptr) { + if (adapter == NULL) { replyError(ctx, ErrorCode::INCOMPATIBLE_VERSION, make_string("Can not send to version '%s' recipient.", ctx._version.toString().c_str())); } else if (timeRemaining == 0) { @@ -375,8 +378,8 @@ RPCNetwork::sync() void RPCNetwork::shutdown() { - _transport->ShutDown(false); - _threadPool->Close(); + _transport.ShutDown(false); + _threadPool.Close(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 0cda6ffecb7..856f9d0ef64 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -2,33 +2,27 @@ #pragma once #include "inetwork.h" -#include "rpcsendadapter.h" -#include "rpctarget.h" -#include "identity.h" +#include "oosmanager.h" +#include "rpcnetworkparams.h" +#include "rpcsendv1.h" +#include "rpcservicepool.h" +#include "rpctargetpool.h" #include <vespa/messagebus/blob.h> #include <vespa/messagebus/blobref.h> #include <vespa/messagebus/message.h> #include <vespa/messagebus/reply.h> #include <vespa/slobrok/imirrorapi.h> #include <vespa/vespalib/component/versionspecification.h> -#include <vespa/vespalib/util/compressionconfig.h> -#include <vespa/fnet/frt/invokable.h> - -class FNET_Transport; +#include <vespa/fnet/transport.h> +#include <vespa/fnet/frt/supervisor.h> namespace slobrok { - namespace api { class RegisterAPI; } - class ConfiguratorFactory; + namespace api { + class RegisterAPI; + } } namespace mbus { - -class OOSManager; -class RPCServicePool; -class RPCTargetPool; -class RPCNetworkParams; -class RPCServiceAddress; - /** * Network implementation based on RPC. This class is responsible for * keeping track of services and for sending messages to services. @@ -36,7 +30,6 @@ class RPCServiceAddress; class RPCNetwork : public INetwork, public FRT_Invokable { private: - using CompressionConfig = vespalib::compression::CompressionConfig; struct SendContext : public RPCTarget::IVersionHandler { vespalib::Lock _lock; RPCNetwork &_net; @@ -58,26 +51,24 @@ private: void PerformTask() override; }; - using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - std::unique_ptr<OOSManager> _oosManager; - int _requestedPort; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; + typedef std::map<vespalib::VersionSpecification, RPCSendAdapter*> SendAdapterMap; + + INetworkOwner *_owner; + Identity _ident; + FastOS_ThreadPool _threadPool; + FNET_Transport _transport; + FRT_Supervisor _orb; + FNET_Scheduler &_scheduler; + RPCTargetPool _targetPool; + TargetPoolTask _targetPoolTask; + RPCServicePool _servicePool; + slobrok::ConfiguratorFactory _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + OOSManager _oosManager; + int _requestedPort; + RPCSendV1 _sendV1; + SendAdapterMap _sendAdapters; /** * Resolves and assigns a service address for the given recipient using the @@ -168,7 +159,7 @@ public: * * @return port number **/ - int getPort() const; + int getPort() const { return _orb.GetListenPort(); } /** * Allocate a new rpc request object. The caller of this method gets the @@ -200,7 +191,7 @@ public: * * @return internal OOS manager **/ - OOSManager &getOOSManager() { return *_oosManager; } + OOSManager &getOOSManager() { return _oosManager; } /** * Obtain a reference to the internal supervisor. This is used by @@ -208,7 +199,7 @@ public: * * @return The supervisor. */ - FRT_Supervisor &getSupervisor() { return *_orb; } + FRT_Supervisor &getSupervisor() { return _orb; } /** * Deliver an error reply to the recipients of a {@link SendContext} in a @@ -218,7 +209,8 @@ public: * @param errCode The error code to return. * @param errMsg The error string to return. */ - void replyError(const SendContext &ctx, uint32_t errCode, const string &errMsg); + void replyError(const SendContext &ctx, uint32_t errCode, + const string &errMsg); void attach(INetworkOwner &owner) override; const string getConnectionSpec() const override; @@ -233,8 +225,9 @@ public: void shutdown() override; void postShutdownHook() override; const slobrok::api::IMirrorAPI &getMirror() const override; - CompressionConfig getCompressionConfig() { return _compressionConfig; } + void invoke(FRT_RPCRequest *req); }; } // namespace mbus + diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index df35d51cb54..e025db0e350 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -11,8 +11,7 @@ RPCNetworkParams::RPCNetworkParams() : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _connectionExpireSecs(30), - _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) + _connectionExpireSecs(30) { } RPCNetworkParams::~RPCNetworkParams() {} diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index bfc624a6523..a65248f7299 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -3,7 +3,6 @@ #include "identity.h" #include <vespa/slobrok/cfg.h> -#include <vespa/vespalib/util/compressionconfig.h> namespace mbus { @@ -13,15 +12,13 @@ namespace mbus { */ class RPCNetworkParams { private: - using CompressionConfig = vespalib::compression::CompressionConfig; - Identity _identity; + Identity _identity; config::ConfigUri _slobrokConfig; - string _oosServerPattern; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; + string _oosServerPattern; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + double _connectionExpireSecs; public: RPCNetworkParams(); @@ -180,12 +177,6 @@ public: _maxOutputBufferSize = maxOutputBufferSize; return *this; } - - RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) { - _compressionConfig = compressionConfig; - return *this; - } - CompressionConfig getCompressionConfig() const { return _compressionConfig; } }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp deleted file mode 100644 index 705b8648442..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ /dev/null @@ -1,282 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "rpcsend.h" -#include "rpcsend_private.h" -#include "rpcserviceaddress.h" -#include <vespa/messagebus/network/rpcnetwork.h> -#include <vespa/messagebus/tracelevel.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/errorcode.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/channel.h> -#include <vespa/fnet/frt/reflection.h> - -#include <vespa/vespalib/data/slime/cursor.h> - -using vespalib::make_string; - -namespace mbus { - -using network::internal::ReplyContext; -using network::internal::SendContext; - -namespace { - -class FillByCopy final : public PayLoadFiller -{ -public: - FillByCopy(BlobRef payload) : _payload(payload) { } - void fill(FRT_Values & v) const override { - v.AddData(_payload.data(), _payload.size()); - } - void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override { - v.setData(name, vespalib::Memory(_payload.data(), _payload.size())); - } -private: - BlobRef _payload; -}; - -class FillByHandover final : public PayLoadFiller -{ -public: - FillByHandover(Blob payload) : _payload(std::move(payload)) { } - void fill(FRT_Values & v) const override { - size_t sz = _payload.size(); - v.AddData(std::move(_payload.payload()), sz); - } - void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const override { - v.setData(name, vespalib::Memory(_payload.data(), _payload.size())); - } -private: - mutable Blob _payload; -}; - -} - -RPCSend::RPCSend() : - _net(nullptr), - _clientIdent("client"), - _serverIdent("server") -{ } - -RPCSend::~RPCSend() {} - -void -RPCSend::attach(RPCNetwork &net) -{ - _net = &net; - const string &prefix = _net->getIdentity().getServicePrefix(); - if (!prefix.empty()) { - _clientIdent = make_string("'%s'", prefix.c_str()); - _serverIdent = _clientIdent; - } - - FRT_ReflectionBuilder builder(&_net->getSupervisor()); - build(builder); -} - -void -RPCSend::replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err) -{ - Reply::UP reply(new EmptyReply()); - reply->setContext(Context(new ReplyContext(*req, version))); - reply->getTrace().setLevel(traceLevel); - reply->addError(err); - handleReply(std::move(reply)); -} - -void -RPCSend::handleDiscard(Context ctx) -{ - ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR)); - FRT_RPCRequest &req = tmp->getRequest(); - FNET_Channel *chn = req.GetContext()._value.CHANNEL; - req.SubRef(); - chn->Free(); -} - -void -RPCSend::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, Blob payload, uint64_t timeRemaining) -{ - send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); -} - -void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, BlobRef payload, uint64_t timeRemaining) -{ - send(recipient, version, FillByCopy(payload), timeRemaining); -} - -void -RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & payload, uint64_t timeRemaining) -{ - SendContext::UP ctx(new SendContext(recipient, timeRemaining)); - RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); - const Message &msg = recipient.getMessage(); - Route route = recipient.getRoute(); - Hop hop = route.removeHop(0); - - FRT_RPCRequest *req = _net->allocRequest(); - encodeRequest(*req, version, route, address, msg, recipient.getTrace().getLevel(), payload, timeRemaining); - - if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { - ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, - make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.", - version.toString().c_str(), _clientIdent.c_str(), - address.getServiceName().c_str(), ctx->getTimeout())); - } - - if (hop.getIgnoreResult()) { - address.getTarget().getFRTTarget().InvokeVoid(req); - if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { - ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, - make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str())); - } - Reply::UP reply(new EmptyReply()); - reply->getTrace().swap(ctx->getTrace()); - _net->getOwner().deliverReply(std::move(reply), recipient); - } else { - SendContext *ptr = ctx.release(); - req->SetContext(FNET_Context(ptr)); - address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this); - } -} - -void -RPCSend::RequestDone(FRT_RPCRequest *req) -{ - SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP)); - const string &serviceName = static_cast<RPCServiceAddress&>(ctx->getRecipient().getServiceAddress()).getServiceName(); - Reply::UP reply; - Error error; - Trace & trace = ctx->getTrace(); - if (!req->CheckReturnTypes(getReturnSpec())) { - reply.reset(new EmptyReply()); - switch (req->GetErrorCode()) { - case FRTE_RPC_TIMEOUT: - error = Error(ErrorCode::TIMEOUT, - make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s", - serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage())); - break; - case FRTE_RPC_CONNECTION: - error = Error(ErrorCode::CONNECTION_ERROR, - make_string("A connection error occured for '%s'; %s", - serviceName.c_str(), req->GetErrorMessage())); - break; - default: - error = Error(ErrorCode::NETWORK_ERROR, - make_string("A network error occured for '%s'; %s", - serviceName.c_str(), req->GetErrorMessage())); - } - } else { - FRT_Values &ret = *req->GetReturn(); - reply = createReply(ret, serviceName, error, trace.getRoot()); - } - if (trace.shouldTrace(TraceLevel::SEND_RECEIVE)) { - trace.trace(TraceLevel::SEND_RECEIVE, - make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str())); - } - reply->getTrace().swap(trace); - if (error.getCode() != ErrorCode::NONE) { - reply->addError(error); - } - _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient()); - req->SubRef(); -} - -std::unique_ptr<Reply> -RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & version, BlobRef payload, Error & error) const -{ - Reply::UP reply; - IProtocol * protocol = _net->getOwner().getProtocol(protocolName); - if (protocol != nullptr) { - Routable::UP routable = protocol->decode(version, payload); - if (routable) { - if (routable->isReply()) { - reply.reset(static_cast<Reply*>(routable.release())); - } else { - error = Error(ErrorCode::DECODE_ERROR, "Payload decoded to a message when expecting a reply."); - } - } else { - error = Error(ErrorCode::DECODE_ERROR, - make_string("Protocol '%s' failed to decode routable.", protocolName.c_str())); - } - - } else { - error = Error(ErrorCode::UNKNOWN_PROTOCOL, - make_string("Protocol '%s' is not known by %s.", protocolName.c_str(), _serverIdent.c_str())); - } - return reply; -} - -void -RPCSend::handleReply(Reply::UP reply) -{ - ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR)); - FRT_RPCRequest &req = ctx->getRequest(); - string version = ctx->getVersion().toString(); - if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { - reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.", - version.c_str(), _serverIdent.c_str())); - } - Blob payload(0); - if (reply->getType() != 0) { - payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply); - if (payload.size() == 0) { - reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log.")); - } - } - FRT_Values &ret = *req.GetReturn(); - createResponse(ret, version, *reply, std::move(payload)); - req.Return(); -} - -void -RPCSend::invoke(FRT_RPCRequest *req) -{ - req->Detach(); - FRT_Values &args = *req->GetParams(); - - std::unique_ptr<Params> params = toParams(args); - IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); - if (protocol == nullptr) { - replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, - make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str()))); - return; - } - Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); - req->DiscardBlobs(); - if ( ! routable ) { - replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::DECODE_ERROR, - make_string("Protocol '%s' failed to decode routable.", params->getProtocol().c_str()))); - return; - } - if (routable->isReply()) { - replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::DECODE_ERROR, "Payload decoded to a reply when expecting a mesage.")); - return; - } - Message::UP msg(static_cast<Message*>(routable.release())); - vespalib::stringref route = params->getRoute(); - if (!route.empty()) { - msg->setRoute(Route::parse(route)); - } - msg->setContext(Context(new ReplyContext(*req, params->getVersion()))); - msg->pushHandler(*this, *this); - msg->setRetryEnabled(params->useRetry()); - msg->setRetry(params->getRetries()); - msg->setTimeReceivedNow(); - msg->setTimeRemaining(params->getRemainingTime()); - msg->getTrace().setLevel(params->getTraceLevel()); - if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { - msg->getTrace().trace(TraceLevel::SEND_RECEIVE, - make_string("Message (type %d) received at %s for session '%s'.", - msg->getType(), _serverIdent.c_str(), string(params->getSession()).c_str())); - } - _net->getOwner().deliverMessage(std::move(msg), params->getSession()); -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h deleted file mode 100644 index c707b47f548..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "rpcsendadapter.h" -#include <vespa/messagebus/idiscardhandler.h> -#include <vespa/messagebus/ireplyhandler.h> -#include <vespa/messagebus/common.h> -#include <vespa/fnet/frt/invokable.h> -#include <vespa/fnet/frt/invoker.h> - -class FRT_ReflectionBuilder; - -namespace vespalib::slime { class Cursor; } -namespace vespalib { class Memory; } -namespace vespalib { class TraceNode; } -namespace mbus { - -class Error; -class Route; -class Message; -class RPCServiceAddress; - -class PayLoadFiller -{ -public: - virtual ~PayLoadFiller() { } - virtual void fill(FRT_Values & v) const = 0; - virtual void fill(const vespalib::Memory & name, vespalib::slime::Cursor & v) const = 0; -}; - -class RPCSend : public RPCSendAdapter, - public FRT_Invokable, - public FRT_IRequestWait, - public IDiscardHandler, - public IReplyHandler -{ -public: - class Params { - public: - virtual ~Params() {} - virtual vespalib::Version getVersion() const = 0; - virtual vespalib::stringref getProtocol() const = 0; - virtual uint32_t getTraceLevel() const = 0; - virtual bool useRetry() const = 0; - virtual uint32_t getRetries() const = 0; - virtual uint64_t getRemainingTime() const = 0; - virtual vespalib::stringref getRoute() const = 0; - virtual vespalib::stringref getSession() const = 0; - virtual BlobRef getPayload() const = 0; - }; -protected: - RPCNetwork *_net; - string _clientIdent; - string _serverIdent; - - virtual void build(FRT_ReflectionBuilder & builder) = 0; - virtual std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, - Error & error, vespalib::TraceNode & rootTrace) const = 0; - virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const = 0; - virtual const char * getReturnSpec() const = 0; - virtual void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const = 0; - virtual std::unique_ptr<Params> toParams(const FRT_Values ¶m) const = 0; - - void send(RoutingNode &recipient, const vespalib::Version &version, - const PayLoadFiller & filler, uint64_t timeRemaining); - std::unique_ptr<Reply> decode(vespalib::stringref protocol, const vespalib::Version & version, - BlobRef payload, Error & error) const; - /** - * Send an error reply for a given request. - * - * @param request The FRT request to reply to. - * @param version The version to serialize for. - * @param traceLevel The trace level to set in the reply. - * @param err The error to reply with. - */ - void replyError(FRT_RPCRequest *req, const vespalib::Version &version, uint32_t traceLevel, const Error &err); -public: - RPCSend(); - ~RPCSend(); - - void invoke(FRT_RPCRequest *req); -private: - void attach(RPCNetwork &net) final override; - void handleDiscard(Context ctx) final override; - void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, - Blob payload, uint64_t timeRemaining) final override; - void send(RoutingNode &recipient, const vespalib::Version &version, - BlobRef payload, uint64_t timeRemaining) final override; - void RequestDone(FRT_RPCRequest *req) final override; - void handleReply(std::unique_ptr<Reply> reply) final override; -}; - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend_private.h b/messagebus/src/vespa/messagebus/network/rpcsend_private.h deleted file mode 100644 index f5867e79856..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsend_private.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/messagebus/trace.h> -#include <vespa/messagebus/routing/routingnode.h> - -namespace mbus::network::internal { -/** - * Implements a helper class to hold the necessary context to create a reply from - * an rpc return value. This object is held as the context of an FRT_RPCRequest. - */ -class SendContext { -private: - mbus::RoutingNode &_recipient; - mbus::Trace _trace; - double _timeout; - -public: - typedef std::unique_ptr<SendContext> UP; - SendContext(const SendContext &) = delete; - SendContext & operator = (const SendContext &) = delete; - SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining) - : _recipient(recipient), - _trace(recipient.getTrace().getLevel()), - _timeout(timeRemaining * 0.001) { } - mbus::RoutingNode &getRecipient() { return _recipient; } - mbus::Trace &getTrace() { return _trace; } - double getTimeout() { return _timeout; } -}; - -/** - * Implements a helper class to hold the necessary context to send a reply as an - * rpc return value. This object is held in the callstack of the reply. - */ -class ReplyContext { -private: - FRT_RPCRequest &_request; - vespalib::Version _version; - -public: - typedef std::unique_ptr<ReplyContext> UP; - ReplyContext(const ReplyContext &) = delete; - ReplyContext & operator = (const ReplyContext &) = delete; - - ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version) - : _request(request), _version(version) { } - FRT_RPCRequest &getRequest() { return _request; } - const vespalib::Version &getVersion() { return _version; } -}; - - -} diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp index 6b89a278b88..4cf45207010 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.cpp @@ -1,40 +1,87 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "rpcsendv1.h" #include "rpcnetwork.h" -#include "rpcserviceaddress.h" +#include <vespa/messagebus/routing/routingnode.h> #include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/errorcode.h> #include <vespa/messagebus/tracelevel.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/fnet/frt/reflection.h> +#include <vespa/fnet/channel.h> using vespalib::make_string; -namespace mbus { - namespace { -const char *METHOD_NAME = "mbus.send1"; -const char *METHOD_PARAMS = "sssbilsxi"; -const char *METHOD_RETURN = "sdISSsxs"; +/** + * Implements a helper class to hold the necessary context to create a reply from + * an rpc return value. This object is held as the context of an FRT_RPCRequest. + */ +class SendContext { +private: + mbus::RoutingNode &_recipient; + mbus::Trace _trace; + double _timeout; -} +public: + typedef std::unique_ptr<SendContext> UP; + SendContext(const SendContext &) = delete; + SendContext & operator = (const SendContext &) = delete; + SendContext(mbus::RoutingNode &recipient, uint64_t timeRemaining) + : _recipient(recipient), + _trace(recipient.getTrace().getLevel()), + _timeout(timeRemaining * 0.001) { } + mbus::RoutingNode &getRecipient() { return _recipient; } + mbus::Trace &getTrace() { return _trace; } + double getTimeout() { return _timeout; } +}; -bool RPCSendV1::isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons) -{ - return (method == METHOD_NAME) && - (request == METHOD_PARAMS) && - (respons == METHOD_RETURN); -} +/** + * Implements a helper class to hold the necessary context to send a reply as an + * rpc return value. This object is held in the callstack of the reply. + */ +class ReplyContext { +private: + FRT_RPCRequest &_request; + vespalib::Version _version; + +public: + typedef std::unique_ptr<ReplyContext> UP; + ReplyContext(const ReplyContext &) = delete; + ReplyContext & operator = (const ReplyContext &) = delete; + + ReplyContext(FRT_RPCRequest &request, const vespalib::Version &version) + : _request(request), _version(version) { } + FRT_RPCRequest &getRequest() { return _request; } + const vespalib::Version &getVersion() { return _version; } +}; -const char * -RPCSendV1::getReturnSpec() const { - return METHOD_RETURN; } +namespace mbus { + +const char *RPCSendV1::METHOD_NAME = "mbus.send1"; +const char *RPCSendV1::METHOD_PARAMS = "sssbilsxi"; +const char *RPCSendV1::METHOD_RETURN = "sdISSsxs"; + +RPCSendV1::RPCSendV1() : + _net(NULL), + _clientIdent("client"), + _serverIdent("server") +{ } + +RPCSendV1::~RPCSendV1() {} + void -RPCSendV1::build(FRT_ReflectionBuilder & builder) +RPCSendV1::attach(RPCNetwork &net) { + _net = &net; + const string &prefix = _net->getIdentity().getServicePrefix(); + if (!prefix.empty()) { + _clientIdent = make_string("'%s'", prefix.c_str()); + _serverIdent = _clientIdent; + } + + FRT_ReflectionBuilder builder(&_net->getSupervisor()); builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV1::invoke), this); builder.MethodDesc("Send a message bus request and get a reply back."); builder.ParamDesc("version", "The version of the message."); @@ -56,14 +103,59 @@ RPCSendV1::build(FRT_ReflectionBuilder & builder) builder.ReturnDesc("trace", "A string representation of the trace."); } +namespace { + +class FillByCopy : public PayLoadFiller +{ +public: + FillByCopy(BlobRef payload) : _payload(payload) { } + void fill(FRT_Values & v) const override { + v.AddData(_payload.data(), _payload.size()); + } +private: + BlobRef _payload; +}; + +class FillByHandover : public PayLoadFiller +{ +public: + FillByHandover(Blob payload) : _payload(std::move(payload)) { } + void fill(FRT_Values & v) const override { + v.AddData(std::move(_payload.payload()), _payload.size()); + } +private: + mutable Blob _payload; +}; + +} + void -RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const +RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version, + BlobRef payload, uint64_t timeRemaining) { + send(recipient, version, FillByCopy(payload), timeRemaining); +} + +void +RPCSendV1::sendByHandover(RoutingNode &recipient, const vespalib::Version &version, + Blob payload, uint64_t timeRemaining) +{ + send(recipient, version, FillByHandover(std::move(payload)), timeRemaining); +} - FRT_Values &args = *req.GetParams(); - req.SetMethodName(METHOD_NAME); +void +RPCSendV1::send(RoutingNode &recipient, const vespalib::Version &version, + const PayLoadFiller & payload, uint64_t timeRemaining) +{ + SendContext::UP ctx(new SendContext(recipient, timeRemaining)); + RPCServiceAddress &address = static_cast<RPCServiceAddress&>(recipient.getServiceAddress()); + const Message &msg = recipient.getMessage(); + Route route = recipient.getRoute(); + Hop hop = route.removeHop(0); + + FRT_RPCRequest *req = _net->allocRequest(); + FRT_Values &args = *req->GetParams(); + req->SetMethodName(METHOD_NAME); args.AddString(version.toString().c_str()); args.AddString(route.toString().c_str()); args.AddString(address.getSessionName().c_str()); @@ -71,103 +163,237 @@ RPCSendV1::encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, args.AddInt32(msg.getRetry()); args.AddInt64(timeRemaining); args.AddString(msg.getProtocol().c_str()); - filler.fill(args); - args.AddInt32(traceLevel); -} + payload.fill(args); + args.AddInt32(recipient.getTrace().getLevel()); -namespace { + if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, + make_string("Sending message (version %s) from %s to '%s' with %.2f seconds timeout.", + version.toString().c_str(), _clientIdent.c_str(), + address.getServiceName().c_str(), ctx->getTimeout())); + } + + if (hop.getIgnoreResult()) { + address.getTarget().getFRTTarget().InvokeVoid(req); + if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, + make_string("Not waiting for a reply from '%s'.", address.getServiceName().c_str())); + } + Reply::UP reply(new EmptyReply()); + reply->getTrace().swap(ctx->getTrace()); + _net->getOwner().deliverReply(std::move(reply), recipient); + } else { + SendContext *ptr = ctx.release(); + req->SetContext(FNET_Context(ptr)); + address.getTarget().getFRTTarget().InvokeAsync(req, ptr->getTimeout(), this); + } +} -class ParamsV1 : public RPCSend::Params +void +RPCSendV1::RequestDone(FRT_RPCRequest *req) { -public: - ParamsV1(const FRT_Values &args) : _args(args) { } + SendContext::UP ctx(static_cast<SendContext*>(req->GetContext()._value.VOIDP)); + const string &serviceName = static_cast<RPCServiceAddress&>( + ctx->getRecipient().getServiceAddress()).getServiceName(); + Reply::UP reply; + Error error; + if (!req->CheckReturnTypes(METHOD_RETURN)) { + reply.reset(new EmptyReply()); + switch (req->GetErrorCode()) { + case FRTE_RPC_TIMEOUT: + error = Error(ErrorCode::TIMEOUT, + make_string("A timeout occured while waiting for '%s' (%g seconds expired); %s", + serviceName.c_str(), ctx->getTimeout(), req->GetErrorMessage())); + break; + case FRTE_RPC_CONNECTION: + error = Error(ErrorCode::CONNECTION_ERROR, + make_string("A connection error occured for '%s'; %s", + serviceName.c_str(), req->GetErrorMessage())); + break; + default: + error = Error(ErrorCode::NETWORK_ERROR, + make_string("A network error occured for '%s'; %s", + serviceName.c_str(), req->GetErrorMessage())); + } + } else { + FRT_Values &ret = *req->GetReturn(); - uint32_t getTraceLevel() const override { return _args[8]._intval32; } - bool useRetry() const override { return _args[3]._intval8 != 0; } - uint32_t getRetries() const override { return _args[4]._intval32; } - uint64_t getRemainingTime() const override { return _args[5]._intval64; } + vespalib::Version version = vespalib::Version(ret[0]._string._str); + double retryDelay = ret[1]._double; + uint32_t *errorCodes = ret[2]._int32_array._pt; + uint32_t errorCodesLen = ret[2]._int32_array._len; + FRT_StringValue *errorMessages = ret[3]._string_array._pt; + uint32_t errorMessagesLen = ret[3]._string_array._len; + FRT_StringValue *errorServices = ret[4]._string_array._pt; + uint32_t errorServicesLen = ret[4]._string_array._len; + const char *protocolName = ret[5]._string._str; + const char *payload = ret[6]._data._buf; + uint32_t payloadLen = ret[6]._data._len; + const char *trace = ret[7]._string._str; - vespalib::Version getVersion() const override { - return vespalib::Version(vespalib::stringref(_args[0]._string._str, _args[0]._string._len)); - } - vespalib::stringref getRoute() const override { - return vespalib::stringref(_args[1]._string._str, _args[1]._string._len); - } - vespalib::stringref getSession() const override { - return vespalib::stringref(_args[2]._string._str, _args[2]._string._len); + if (payloadLen > 0) { + IProtocol * protocol = _net->getOwner().getProtocol(protocolName); + if (protocol != nullptr) { + Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen)); + if (routable) { + if (routable->isReply()) { + reply.reset(static_cast<Reply*>(routable.release())); + } else { + error = Error(ErrorCode::DECODE_ERROR, + "Payload decoded to a message when expecting a reply."); + } + } else { + error = Error(ErrorCode::DECODE_ERROR, + make_string("Protocol '%s' failed to decode routable.", protocolName)); + } + + } else { + error = Error(ErrorCode::UNKNOWN_PROTOCOL, + make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str())); + } + } + if ( ! reply ) { + reply.reset(new EmptyReply()); + } + reply->setRetryDelay(retryDelay); + for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) { + reply->addError(Error(errorCodes[i], + errorMessages[i]._str, + errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str())); + } + ctx->getTrace().getRoot().addChild(TraceNode::decode(trace)); } - vespalib::stringref getProtocol() const override { - return vespalib::stringref(_args[6]._string._str, _args[6]._string._len); + if (ctx->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + ctx->getTrace().trace(TraceLevel::SEND_RECEIVE, + make_string("Reply (type %d) received at %s.", reply->getType(), _clientIdent.c_str())); } - BlobRef getPayload() const override { - return BlobRef(_args[7]._data._buf, _args[7]._data._len); + reply->getTrace().swap(ctx->getTrace()); + if (error.getCode() != ErrorCode::NONE) { + reply->addError(error); } -private: - const FRT_Values & _args; -}; - + _net->getOwner().deliverReply(std::move(reply), ctx->getRecipient()); + req->SubRef(); } -std::unique_ptr<RPCSend::Params> -RPCSendV1::toParams(const FRT_Values &args) const +void +RPCSendV1::invoke(FRT_RPCRequest *req) { - return std::make_unique<ParamsV1>(args); -} + req->Detach(); + FRT_Values &args = *req->GetParams(); + vespalib::Version version = vespalib::Version(args[0]._string._str); + const char *route = args[1]._string._str; + const char *session = args[2]._string._str; + bool retryEnabled = args[3]._intval8 != 0; + uint32_t retry = args[4]._intval32; + uint64_t timeRemaining = args[5]._intval64; + const char *protocolName = args[6]._string._str; + const char *payload = args[7]._data._buf; + uint32_t payloadLen = args[7]._data._len; + uint32_t traceLevel = args[8]._intval32; -std::unique_ptr<Reply> -RPCSendV1::createReply(const FRT_Values & ret, const string & serviceName, Error & error, vespalib::TraceNode & rootTrace) const -{ - vespalib::Version version = vespalib::Version(ret[0]._string._str); - double retryDelay = ret[1]._double; - uint32_t *errorCodes = ret[2]._int32_array._pt; - uint32_t errorCodesLen = ret[2]._int32_array._len; - FRT_StringValue *errorMessages = ret[3]._string_array._pt; - uint32_t errorMessagesLen = ret[3]._string_array._len; - FRT_StringValue *errorServices = ret[4]._string_array._pt; - uint32_t errorServicesLen = ret[4]._string_array._len; - const char *protocolName = ret[5]._string._str; - BlobRef payload(ret[6]._data._buf, ret[6]._data._len); - const char *trace = ret[7]._string._str; - - Reply::UP reply; - if (payload.size() > 0) { - reply = decode(protocolName, version, payload, error); + IProtocol * protocol = _net->getOwner().getProtocol(protocolName); + if (protocol == nullptr) { + replyError(req, version, traceLevel, + Error(ErrorCode::UNKNOWN_PROTOCOL, + make_string("Protocol '%s' is not known by %s.", protocolName, _serverIdent.c_str()))); + return; } - if ( ! reply ) { - reply.reset(new EmptyReply()); + Routable::UP routable = protocol->decode(version, BlobRef(payload, payloadLen)); + req->DiscardBlobs(); + if ( ! routable ) { + replyError(req, version, traceLevel, + Error(ErrorCode::DECODE_ERROR, + make_string("Protocol '%s' failed to decode routable.", protocolName))); + return; + } + if (routable->isReply()) { + replyError(req, version, traceLevel, + Error(ErrorCode::DECODE_ERROR, + "Payload decoded to a reply when expecting a mesage.")); + return; } - reply->setRetryDelay(retryDelay); - for (uint32_t i = 0; i < errorCodesLen && i < errorMessagesLen && i < errorServicesLen; ++i) { - reply->addError(Error(errorCodes[i], errorMessages[i]._str, - errorServices[i]._len > 0 ? errorServices[i]._str : serviceName.c_str())); + Message::UP msg(static_cast<Message*>(routable.release())); + if (strlen(route) > 0) { + msg->setRoute(Route::parse(route)); } - rootTrace.addChild(TraceNode::decode(trace)); - return reply; + msg->setContext(Context(new ReplyContext(*req, version))); + msg->pushHandler(*this, *this); + msg->setRetryEnabled(retryEnabled); + msg->setRetry(retry); + msg->setTimeReceivedNow(); + msg->setTimeRemaining(timeRemaining); + msg->getTrace().setLevel(traceLevel); + if (msg->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + msg->getTrace().trace(TraceLevel::SEND_RECEIVE, + make_string("Message (type %d) received at %s for session '%s'.", + msg->getType(), _serverIdent.c_str(), session)); + } + _net->getOwner().deliverMessage(std::move(msg), session); } void -RPCSendV1::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const { +RPCSendV1::handleReply(Reply::UP reply) +{ + ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR)); + FRT_RPCRequest &req = ctx->getRequest(); + string version = ctx->getVersion().toString(); + if (reply->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + reply->getTrace().trace(TraceLevel::SEND_RECEIVE, make_string("Sending reply (version %s) from %s.", + version.c_str(), _serverIdent.c_str())); + } + Blob payload(0); + if (reply->getType() != 0) { + payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply); + if (payload.size() == 0) { + reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log.")); + } + } + FRT_Values &ret = *req.GetReturn(); ret.AddString(version.c_str()); - ret.AddDouble(reply.getRetryDelay()); + ret.AddDouble(reply->getRetryDelay()); - uint32_t errorCount = reply.getNumErrors(); + uint32_t errorCount = reply->getNumErrors(); uint32_t *errorCodes = ret.AddInt32Array(errorCount); FRT_StringValue *errorMessages = ret.AddStringArray(errorCount); FRT_StringValue *errorServices = ret.AddStringArray(errorCount); for (uint32_t i = 0; i < errorCount; ++i) { - errorCodes[i] = reply.getError(i).getCode(); - ret.SetString(errorMessages + i, reply.getError(i).getMessage().c_str()); - ret.SetString(errorServices + i, reply.getError(i).getService().c_str()); + errorCodes[i] = reply->getError(i).getCode(); + ret.SetString(errorMessages + i, + reply->getError(i).getMessage().c_str()); + ret.SetString(errorServices + i, + reply->getError(i).getService().c_str()); } - ret.AddString(reply.getProtocol().c_str()); + ret.AddString(reply->getProtocol().c_str()); ret.AddData(std::move(payload.payload()), payload.size()); - if (reply.getTrace().getLevel() > 0) { - ret.AddString(reply.getTrace().getRoot().encode().c_str()); + if (reply->getTrace().getLevel() > 0) { + ret.AddString(reply->getTrace().getRoot().encode().c_str()); } else { ret.AddString(""); } + req.Return(); +} + +void +RPCSendV1::handleDiscard(Context ctx) +{ + ReplyContext::UP tmp(static_cast<ReplyContext*>(ctx.value.PTR)); + FRT_RPCRequest &req = tmp->getRequest(); + FNET_Channel *chn = req.GetContext()._value.CHANNEL; + req.SubRef(); + chn->Free(); +} + +void +RPCSendV1::replyError(FRT_RPCRequest *req, const vespalib::Version &version, + uint32_t traceLevel, const Error &err) +{ + Reply::UP reply(new EmptyReply()); + reply->setContext(Context(new ReplyContext(*req, version))); + reply->getTrace().setLevel(traceLevel); + reply->addError(err); + handleReply(std::move(reply)); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv1.h b/messagebus/src/vespa/messagebus/network/rpcsendv1.h index 37f23335309..b634c1ef1b8 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsendv1.h +++ b/messagebus/src/vespa/messagebus/network/rpcsendv1.h @@ -1,24 +1,77 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "rpcsend.h" +#include "rpcsendadapter.h" +#include <vespa/messagebus/idiscardhandler.h> +#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/fnet/frt/invokable.h> + namespace mbus { -class RPCSendV1 : public RPCSend { +class Error; + +class PayLoadFiller +{ public: - static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref respons); + virtual ~PayLoadFiller() { } + virtual void fill(FRT_Values & v) const = 0; +}; + +/** + * Implements the send adapter for method "mbus.send". + */ +class RPCSendV1 : public RPCSendAdapter, + public FRT_Invokable, + public FRT_IRequestWait, + public IDiscardHandler, + public IReplyHandler { private: - void build(FRT_ReflectionBuilder & builder) override; - const char * getReturnSpec() const override; - std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; - void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; - - std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, - Error & error, vespalib::TraceNode & rootTrace) const override; - void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override; + RPCNetwork *_net; + string _clientIdent; + string _serverIdent; + + /** + * Send an error reply for a given request. + * + * @param request The FRT request to reply to. + * @param version The version to serialize for. + * @param traceLevel The trace level to set in the reply. + * @param err The error to reply with. + */ + void replyError(FRT_RPCRequest *req, const vespalib::Version &version, + uint32_t traceLevel, const Error &err); + + void send(RoutingNode &recipient, const vespalib::Version &version, + const PayLoadFiller & filler, uint64_t timeRemaining); +public: + /** The name of the rpc method that this adapter registers. */ + static const char *METHOD_NAME; + + /** The parameter string of the rpc method. */ + static const char *METHOD_PARAMS; + + /** The return string of the rpc method. */ + static const char *METHOD_RETURN; + + /** + * Constructs a new instance of this adapter. This object is unusable until + * its attach() method has been called. + */ + RPCSendV1(); + ~RPCSendV1(); + + void attach(RPCNetwork &net) override; + + void send(RoutingNode &recipient, const vespalib::Version &version, + BlobRef payload, uint64_t timeRemaining) override; + void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, + Blob payload, uint64_t timeRemaining) override; + + void handleReply(std::unique_ptr<Reply> reply) override; + void handleDiscard(Context ctx) override; + void invoke(FRT_RPCRequest *req); + void RequestDone(FRT_RPCRequest *req) override; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp deleted file mode 100644 index 1228e08f3b4..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp +++ /dev/null @@ -1,263 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "rpcsendv2.h" -#include "rpcnetwork.h" -#include "rpcserviceaddress.h" -#include <vespa/messagebus/emptyreply.h> -#include <vespa/messagebus/tracelevel.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/data/slime/slime.h> -#include <vespa/vespalib/data/databuffer.h> -#include <vespa/vespalib/util/compressor.h> -#include <vespa/fnet/frt/reflection.h> - -using vespalib::make_string; -using vespalib::compression::CompressionConfig; -using vespalib::compression::decompress; -using vespalib::compression::compress; -using vespalib::DataBuffer; -using vespalib::ConstBufferRef; -using vespalib::stringref; -using vespalib::Memory; -using vespalib::Slime; -using vespalib::Version; -using namespace vespalib::slime; - -namespace mbus { - -namespace { - -const char *METHOD_NAME = "mbus.slime"; -const char *METHOD_PARAMS = "bixbix"; -const char *METHOD_RETURN = "bixbix"; - -Memory VERSION_F("version"); -Memory ROUTE_F("route"); -Memory SESSION_F("session"); -Memory USERETRY_F("useretry"); -Memory RETRYDELAY_F("retrydelay"); -Memory RETRY_F("retry"); -Memory TIMELEFT_F("timeleft"); -Memory PROTOCOL_F("prot"); -Memory TRACELEVEL_F("tracelevel"); -Memory TRACE_F("trace"); -Memory BLOB_F("msg"); -Memory ERRORS_F("errors"); -Memory CODE_F("code"); -Memory MSG_F("msg"); -Memory SERVICE_F("service"); - -} - -bool RPCSendV2::isCompatible(stringref method, stringref request, stringref response) -{ - return (method == METHOD_NAME) && - (request == METHOD_PARAMS) && - (response == METHOD_RETURN); -} - -void -RPCSendV2::build(FRT_ReflectionBuilder & builder) -{ - builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, true, FRT_METHOD(RPCSendV2::invoke), this); - builder.MethodDesc("Send a message bus slime request and get a reply back."); - builder.ParamDesc("header_encoding", "0=raw, 6=lz4"); - builder.ParamDesc("header_decoded_size", "Uncompressed header blob size"); - builder.ParamDesc("header_payload", "The message header blob in slime"); - builder.ParamDesc("body_encoding", "0=raw, 6=lz4"); - builder.ParamDesc("body_decoded_size", "Uncompressed body blob size"); - builder.ParamDesc("body_payload", "The message body blob in slime"); - builder.ReturnDesc("header_encoding", "0=raw, 6=lz4"); - builder.ReturnDesc("header_decoded_size", "Uncompressed header blob size"); - builder.ReturnDesc("header_payload", "The reply header blob in slime."); - builder.ReturnDesc("body_encoding", "0=raw, 6=lz4"); - builder.ReturnDesc("body_decoded_size", "Uncompressed body blob size"); - builder.ReturnDesc("body_payload", "The reply body blob in slime."); -} - -const char * -RPCSendV2::getReturnSpec() const { - return METHOD_RETURN; -} - -namespace { -class OutputBuf : public vespalib::Output { -public: - OutputBuf(size_t estimatedSize) : _buf(estimatedSize) { } - DataBuffer & getBuf() { return _buf; } -private: - vespalib::WritableMemory reserve(size_t bytes) override { - _buf.ensureFree(bytes); - return vespalib::WritableMemory(_buf.getFree(), _buf.getFreeLen()); - } - Output &commit(size_t bytes) override { - _buf.moveFreeToData(bytes); - return *this; - } - DataBuffer _buf; -}; -} - -void -RPCSendV2::encodeRequest(FRT_RPCRequest &req, const Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const -{ - FRT_Values &args = *req.GetParams(); - req.SetMethodName(METHOD_NAME); - // Place holder for auxillary data to be transfered later. - args.AddInt8(CompressionConfig::NONE); - args.AddInt32(0); - args.AddData("", 0); - - Slime slime; - Cursor & root = slime.setObject(); - - root.setString(VERSION_F, version.toString()); - root.setString(ROUTE_F, route.toString()); - root.setString(SESSION_F, address.getSessionName()); - root.setBool(USERETRY_F, msg.getRetryEnabled()); - root.setLong(RETRY_F, msg.getRetry()); - root.setLong(TIMELEFT_F, timeRemaining); - root.setString(PROTOCOL_F, msg.getProtocol()); - root.setLong(TRACELEVEL_F, traceLevel); - filler.fill(BLOB_F, root); - - OutputBuf rBuf(8192); - BinaryFormat::encode(slime, rBuf); - ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen()); - DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen())); - CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false); - - args.AddInt8(type); - args.AddInt32(toCompress.size()); - args.AddData(buf.stealBuffer(), buf.getDataLen()); -} - -namespace { - -class ParamsV2 : public RPCSend::Params -{ -public: - ParamsV2(const FRT_Values &arg) - : _slime() - { - uint8_t encoding = arg[3]._intval8; - uint32_t uncompressedSize = arg[4]._intval32; - DataBuffer uncompressed(arg[5]._data._buf, arg[5]._data._len); - ConstBufferRef blob(arg[5]._data._buf, arg[5]._data._len); - decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true); - assert(uncompressedSize == uncompressed.getDataLen()); - BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), _slime); - } - - uint32_t getTraceLevel() const override { return _slime.get()[TRACELEVEL_F].asLong(); } - bool useRetry() const override { return _slime.get()[USERETRY_F].asBool(); } - uint32_t getRetries() const override { return _slime.get()[RETRY_F].asLong(); } - uint64_t getRemainingTime() const override { return _slime.get()[TIMELEFT_F].asLong(); } - - Version getVersion() const override { - return Version(_slime.get()[VERSION_F].asString().make_stringref()); - } - stringref getRoute() const override { - return _slime.get()[ROUTE_F].asString().make_stringref(); - } - stringref getSession() const override { - return _slime.get()[SESSION_F].asString().make_stringref(); - } - stringref getProtocol() const override { - return _slime.get()[PROTOCOL_F].asString().make_stringref(); - } - BlobRef getPayload() const override { - Memory m = _slime.get()[BLOB_F].asData(); - return BlobRef(m.data, m.size); - } -private: - Slime _slime; -}; - -} - -std::unique_ptr<RPCSend::Params> -RPCSendV2::toParams(const FRT_Values &args) const -{ - return std::make_unique<ParamsV2>(args); -} - -std::unique_ptr<Reply> -RPCSendV2::createReply(const FRT_Values & ret, const string & serviceName, - Error & error, vespalib::TraceNode & rootTrace) const -{ - uint8_t encoding = ret[3]._intval8; - uint32_t uncompressedSize = ret[4]._intval32; - DataBuffer uncompressed(ret[5]._data._buf, ret[5]._data._len); - ConstBufferRef blob(ret[5]._data._buf, ret[5]._data._len); - decompress(CompressionConfig::toType(encoding), uncompressedSize, blob, uncompressed, true); - assert(uncompressedSize == uncompressed.getDataLen()); - Slime slime; - BinaryFormat::decode(Memory(uncompressed.getData(), uncompressed.getDataLen()), slime); - Inspector & root = slime.get(); - Version version(root[VERSION_F].asString().make_string()); - Memory payload = root[BLOB_F].asData(); - - Reply::UP reply; - if (payload.size > 0) { - reply = decode(root[PROTOCOL_F].asString().make_stringref(), version, BlobRef(payload.data, payload.size), error); - } - if ( ! reply ) { - reply.reset(new EmptyReply()); - } - reply->setRetryDelay(root[RETRYDELAY_F].asDouble()); - Inspector & errors = root[ERRORS_F]; - for (uint32_t i = 0; i < errors.entries(); ++i) { - Inspector & e = errors[i]; - Memory service = e[SERVICE_F].asString(); - reply->addError(Error(e[CODE_F].asLong(), e[MSG_F].asString().make_string(), - (service.size > 0) ? service.make_string() : serviceName)); - } - rootTrace.addChild(TraceNode::decode(root[TRACE_F].asString().make_string())); - return reply; -} - -void -RPCSendV2::createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const -{ - // Place holder for auxillary data to be transfered later. - ret.AddInt8(CompressionConfig::NONE); - ret.AddInt32(0); - ret.AddData("", 0); - - Slime slime; - Cursor & root = slime.setObject(); - - root.setString(VERSION_F, version); - root.setDouble(RETRYDELAY_F, reply.getRetryDelay()); - root.setString(PROTOCOL_F, reply.getProtocol()); - root.setData(BLOB_F, vespalib::Memory(payload.data(), payload.size())); - if (reply.getTrace().getLevel() > 0) { - root.setString(TRACE_F, reply.getTrace().getRoot().encode()); - } - - if (reply.getNumErrors() > 0) { - Cursor & array = root.setArray(ERRORS_F); - for (uint32_t i = 0; i < reply.getNumErrors(); ++i) { - Cursor & error = array.addObject(); - error.setLong(CODE_F, reply.getError(i).getCode()); - error.setString(MSG_F, reply.getError(i).getMessage()); - error.setString(SERVICE_F, reply.getError(i).getService().c_str()); - } - } - - OutputBuf rBuf(8192); - BinaryFormat::encode(slime, rBuf); - ConstBufferRef toCompress(rBuf.getBuf().getData(), rBuf.getBuf().getDataLen()); - DataBuffer buf(vespalib::roundUp2inN(rBuf.getBuf().getDataLen())); - CompressionConfig::Type type = compress(_net->getCompressionConfig(), toCompress, buf, false); - - ret.AddInt8(type); - ret.AddInt32(toCompress.size()); - ret.AddData(buf.stealBuffer(), buf.getDataLen()); - -} - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h deleted file mode 100644 index e793868d2aa..00000000000 --- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "rpcsend.h" - -namespace mbus { - -class RPCSendV2 : public RPCSend { -public: - static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref response); -private: - void build(FRT_ReflectionBuilder & builder) override; - const char * getReturnSpec() const override; - std::unique_ptr<Params> toParams(const FRT_Values ¶m) const override; - void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route, - const RPCServiceAddress & address, const Message & msg, uint32_t traceLevel, - const PayLoadFiller &filler, uint64_t timeRemaining) const override; - - std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName, - Error & error, vespalib::TraceNode & rootTrace) const override; - void createResponse(FRT_Values & ret, const string & version, Reply & reply, Blob payload) const override; -}; - -} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h index 36dde19bd18..c6856057342 100644 --- a/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h +++ b/messagebus/src/vespa/messagebus/network/rpcserviceaddress.h @@ -84,7 +84,7 @@ public: * * @return True if target is set. */ - bool hasTarget() const { return _target.get() != nullptr; } + bool hasTarget() const { return _target.get() != NULL; } }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index 295814f4a8d..20e5a2eb3e3 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -36,7 +36,7 @@ RPCTargetPool::flushTargets(bool force) TargetMap::iterator it = _targets.begin(); while (it != _targets.end()) { Entry &entry = it->second; - if (entry._target.get() != nullptr) { + if (entry._target.get() != NULL) { if (entry._target.use_count() > 1) { entry._lastUse = currentTime; ++it; diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.h b/messagebus/src/vespa/messagebus/network/rpctargetpool.h index 5f858f66993..683982de080 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.h +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.h @@ -1,11 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include "rpcserviceaddress.h" -#include "rpctarget.h" +#include <map> #include <vespa/messagebus/itimer.h> #include <vespa/vespalib/util/sync.h> -#include <map> +#include "rpcserviceaddress.h" +#include "rpctarget.h" class FRT_Supervisor; diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp index 4e2efcfb3b9..d2661e3ef80 100644 --- a/messagebus/src/vespa/messagebus/protocolrepository.cpp +++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp @@ -76,7 +76,7 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName, } catch (const std::exception &e) { LOG(error, "Protocol '%s' threw an exception; %s", protocolName.c_str(), e.what()); } - if (policy.get() == nullptr) { + if (policy.get() == NULL) { LOG(error, "Protocol '%s' failed to create routing policy '%s' with parameter '%s'.", protocolName.c_str(), policyName.c_str(), policyParam.c_str()); return IRoutingPolicy::SP(); diff --git a/messagebus/src/vespa/messagebus/routing/route.cpp b/messagebus/src/vespa/messagebus/routing/route.cpp index b705847c3a5..af7b5113ac2 100644 --- a/messagebus/src/vespa/messagebus/routing/route.cpp +++ b/messagebus/src/vespa/messagebus/routing/route.cpp @@ -69,7 +69,7 @@ Route::toDebugString() const { } Route -Route::parse(vespalib::stringref route) +Route::parse(const string &route) { return RouteParser::createRoute(route); } diff --git a/messagebus/src/vespa/messagebus/routing/route.h b/messagebus/src/vespa/messagebus/routing/route.h index a2a01648cfe..d9932e17d26 100644 --- a/messagebus/src/vespa/messagebus/routing/route.h +++ b/messagebus/src/vespa/messagebus/routing/route.h @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <vector> #include "hop.h" namespace mbus { @@ -32,7 +33,7 @@ public: * @param route The string to parse. * @return A route that corresponds to the string. */ - static Route parse(vespalib::stringref route); + static Route parse(const string &route); /** * Create a Route that contains no hops diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.cpp b/messagebus/src/vespa/messagebus/routing/routeparser.cpp index 668f14f9801..ac52ae3b598 100644 --- a/messagebus/src/vespa/messagebus/routing/routeparser.cpp +++ b/messagebus/src/vespa/messagebus/routing/routeparser.cpp @@ -71,7 +71,7 @@ RouteParser::createDirective(const stringref &str) } Hop -RouteParser::createHop(stringref str) +RouteParser::createHop(const stringref &str) { if (str.empty()) { return Hop().addDirective(createErrorDirective("Failed to parse empty string.")); @@ -84,7 +84,7 @@ RouteParser::createHop(stringref str) } if (len > 4 && str.substr(0, 4) == "tcp/") { IHopDirective::SP tcp = createTcpDirective(str.substr(4)); - if (tcp.get() != nullptr) { + if (tcp.get() != NULL) { return Hop().addDirective(tcp); } } @@ -119,7 +119,7 @@ RouteParser::createHop(stringref str) } Route -RouteParser::createRoute(stringref str) +RouteParser::createRoute(const stringref &str) { Route ret; for (size_t from = 0, at = 0, depth = 0; at <= str.size(); ++at) { diff --git a/messagebus/src/vespa/messagebus/routing/routeparser.h b/messagebus/src/vespa/messagebus/routing/routeparser.h index 8ffba3f6e11..a3f16e49307 100644 --- a/messagebus/src/vespa/messagebus/routing/routeparser.h +++ b/messagebus/src/vespa/messagebus/routing/routeparser.h @@ -29,7 +29,7 @@ public: * @param str The string to parse as a hop. * @return The created hop. */ - static Hop createHop(vespalib::stringref str); + static Hop createHop(const vespalib::stringref &str); /** * Creates a route from a string representation. @@ -37,7 +37,7 @@ public: * @param str The string to parse as a route. * @return The created route. */ - static Route createRoute(vespalib::stringref str); + static Route createRoute(const vespalib::stringref &str); }; } // mbus diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp index 6e100999e1d..62efda4aeb9 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp +++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp @@ -21,7 +21,7 @@ RoutingNode::RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender, : _mbus(mbus), _net(net), _resender(resender), - _parent(nullptr), + _parent(NULL), _recipients(), _children(), _replyHandler(&replyHandler), @@ -45,8 +45,8 @@ RoutingNode::RoutingNode(RoutingNode &parent, const Route &route) _parent(&parent), _recipients(parent._recipients), _children(), - _replyHandler(nullptr), - _discardHandler(nullptr), + _replyHandler(NULL), + _discardHandler(NULL), _trace(parent._trace.getLevel()), _pending(0), _msg(parent._msg), @@ -78,8 +78,8 @@ RoutingNode::clearChildren() void RoutingNode::discard() { - assert(_parent == nullptr); - if (_discardHandler != nullptr) { + assert(_parent == NULL); + if (_discardHandler != NULL) { _discardHandler->handleDiscard(Context()); } } @@ -101,7 +101,7 @@ RoutingNode::prepareForRetry() { _shouldRetry = false; _reply.reset(); - if (_routingContext.get() != nullptr && _routingContext->getSelectOnRetry()) { + if (_routingContext.get() != NULL && _routingContext->getSelectOnRetry()) { clearChildren(); } else if (!_children.empty()) { bool retryingSome = false; @@ -109,7 +109,7 @@ RoutingNode::prepareForRetry() it != _children.end(); ++it) { RoutingNode *child= *it; - if (child->_shouldRetry || child->_reply.get() == nullptr) { + if (child->_shouldRetry || child->_reply.get() == NULL) { child->prepareForRetry(); retryingSome = true; } @@ -126,11 +126,11 @@ RoutingNode::prepareForRetry() void RoutingNode::notifyParent() { - if (_serviceAddress.get() != nullptr) { + if (_serviceAddress.get() != NULL) { _net.freeServiceAddress(*this); } tryIgnoreResult(); - if (_parent != nullptr) { + if (_parent != NULL) { _parent->notifyMerge(); return; } @@ -174,7 +174,7 @@ RoutingNode::addError(uint32_t code, const string &msg) void RoutingNode::addError(const Error &err) { - if (_reply.get() != nullptr) { + if (_reply.get() != NULL) { _reply->getTrace().swap(_trace); _reply->addError(err); _reply->getTrace().swap(_trace); @@ -186,8 +186,8 @@ RoutingNode::addError(const Error &err) void RoutingNode::setReply(Reply::UP reply) { - if (reply.get() != nullptr) { - _shouldRetry = _resender != nullptr && _resender->shouldRetry(*reply); + if (reply.get() != NULL) { + _shouldRetry = _resender != NULL && _resender->shouldRetry(*reply); _trace.getRoot().addChild(reply->getTrace().getRoot()); reply->getTrace().clear(); } @@ -211,7 +211,7 @@ RoutingNode::notifyAbort(const string &msg) mystack.pop(); if (!node->_isActive) { // reply not pending - } else if (node->_reply.get() != nullptr) { + } else if (node->_reply.get() != NULL) { node->notifyParent(); } else if (node->_children.empty()) { node->setError(ErrorCode::SEND_ABORTED, msg); @@ -240,7 +240,7 @@ RoutingNode::notifyTransmit() if (node->hasReply()) { node->notifyParent(); } else { - assert(node->_serviceAddress.get() != nullptr); + assert(node->_serviceAddress.get() != NULL); sendTo.push_back(node); } } else { @@ -296,7 +296,7 @@ RoutingNode::notifyMerge() setError(ErrorCode::POLICY_ERROR, make_string("Policy '%s' threw an exception; %s", dir.getName().c_str(), e.what())); } - if (_reply.get() == nullptr) { + if (_reply.get() == NULL) { setError(ErrorCode::APP_FATAL_ERROR, make_string("Routing policy '%s' failed to merge replies.", dir.getName().c_str())); } @@ -315,12 +315,12 @@ RoutingNode::hasUnconsumedErrors() while (!mystack.empty()) { RoutingNode *node = mystack.top(); mystack.pop(); - if (node->_reply.get() != nullptr) { + if (node->_reply.get() != NULL) { for (uint32_t i = 0; i < node->_reply->getNumErrors(); ++i) { int errorCode = node->_reply->getError(i).getCode(); RoutingNode *it = node; - while (it != nullptr) { - if (it->_routingContext.get() != nullptr && + while (it != NULL) { + if (it->_routingContext.get() != NULL && it->_routingContext->isConsumableError(errorCode)) { errorCode = ErrorCode::NONE; @@ -329,7 +329,7 @@ RoutingNode::hasUnconsumedErrors() it = it->_parent; } if (errorCode != ErrorCode::NONE) { - _shouldRetry = _resender != nullptr && _resender->canRetry(errorCode); + _shouldRetry = _resender != NULL && _resender->canRetry(errorCode); if (!_shouldRetry) { return true; // no need to continue } @@ -374,17 +374,17 @@ RoutingNode::resolve(uint32_t depth) if (executePolicySelect()) { return resolveChildren(depth + 1); } - return _reply.get() != nullptr; + return _reply.get() != NULL; } _net.allocServiceAddress(*this); - return _serviceAddress.get() != nullptr || _reply.get() != nullptr; + return _serviceAddress.get() != NULL || _reply.get() != NULL; } bool RoutingNode::lookupHop() { RoutingTable::SP table = _mbus.getRoutingTable(_msg.getProtocol()); - if (table.get() != nullptr) { + if (table.get() != NULL) { string name = _route.getHop(0).getServiceName(); if (table->hasHop(name)) { const HopBlueprint *hop = table->getHop(name); @@ -404,7 +404,7 @@ RoutingNode::lookupRoute() Hop &hop = _route.getHop(0); if (hop.getDirective(0)->getType() == IHopDirective::TYPE_ROUTE) { RouteDirective &dir = static_cast<RouteDirective&>(*hop.getDirective(0)); - if (table.get() == nullptr || !table->hasRoute(dir.getName())) { + if (table.get() == NULL || !table->hasRoute(dir.getName())) { setError(ErrorCode::ILLEGAL_ROUTE, make_string("Route '%s' does not exist.", dir.getName().c_str())); return false; @@ -415,7 +415,7 @@ RoutingNode::lookupRoute() dir.getName().c_str(), _route.toString().c_str())); return true; } - if (table.get() != nullptr) { + if (table.get() != NULL) { string name = hop.getServiceName(); if (table->hasRoute(name)) { insertRoute(*table->getRoute(name)); @@ -474,7 +474,7 @@ RoutingNode::executePolicySelect() { const PolicyDirective &dir = _routingContext->getDirective(); _policy = _mbus.getRoutingPolicy(_msg.getProtocol(), dir.getName(), dir.getParam()); - if (_policy.get() == nullptr) { + if (_policy.get() == NULL) { setError(ErrorCode::UNKNOWN_POLICY, make_string( "Protocol '%s' could not create routing policy '%s' with parameter '%s'.", _msg.getProtocol().c_str(), dir.getName().c_str(), dir.getParam().c_str())); @@ -489,7 +489,7 @@ RoutingNode::executePolicySelect() return false; } if (_children.empty()) { - if (_reply.get() == nullptr) { + if (_reply.get() == NULL) { setError(ErrorCode::NO_SERVICES_FOR_ROUTE, make_string("Policy '%s' selected no recipients for route '%s'.", dir.getName().c_str(), _route.toString().c_str())); @@ -522,7 +522,7 @@ RoutingNode::resolveChildren(uint32_t childDepth) RoutingNode *child = *it; child->_trace.trace(TraceLevel::SPLIT_MERGE, make_string("Resolving '%s'.", child->_route.toString().c_str())); - child->_isActive = (child->_reply.get() == nullptr); + child->_isActive = (child->_reply.get() == NULL); if (child->_isActive) { ++numActiveChildren; if (!child->resolve(childDepth)) { @@ -562,7 +562,7 @@ RoutingNode::tryIgnoreResult() if (!shouldIgnoreResult()) { return false; } - if (_reply.get() == nullptr || !_reply->hasErrors()) { + if (_reply.get() == NULL || !_reply->hasErrors()) { return false; } setReply(Reply::UP(new EmptyReply())); diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h index 8951785c621..22ff07c26e5 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.h +++ b/messagebus/src/vespa/messagebus/routing/routingnode.h @@ -228,7 +228,7 @@ public: */ RoutingNode(MessageBus &mbus, INetwork &net, Resender *resender, IReplyHandler &replyHandler, Message &msg, - IDiscardHandler *discardHandler = nullptr); + IDiscardHandler *discardHandler = NULL); /** * Destructor. Frees up any allocated resources, namely all child nodes of @@ -348,7 +348,6 @@ public: * @return The message being routed. */ Message &getMessage() { return _msg; } - const Message & getMessage() const { return _msg; } /** * Returns the trace object for this node. Each node has a separate trace @@ -357,7 +356,6 @@ public: * @return The trace object. */ Trace &getTrace() { return _trace; } - const Trace &getTrace() const { return _trace; } /** * Returns the route object as it exists at this point of the tree. @@ -371,7 +369,7 @@ public: * * @return True if this node has a reply. */ - bool hasReply() const { return _reply.get() != nullptr; } + bool hasReply() const { return _reply.get() != NULL; } /** * Returns the reply of this node. @@ -421,7 +419,7 @@ public: * * @return True if an address is set. */ - bool hasServiceAddress() { return _serviceAddress.get() != nullptr; } + bool hasServiceAddress() { return _serviceAddress.get() != NULL; } /** * Returns the service address of this node. This is attached by the network @@ -431,7 +429,6 @@ public: * @return The recipient address. */ IServiceAddress &getServiceAddress() { return *_serviceAddress; } - const IServiceAddress &getServiceAddress() const { return *_serviceAddress; } /** * Sets the service address of this node. This is called by the network diff --git a/messagebus/src/vespa/messagebus/routing/routingtable.cpp b/messagebus/src/vespa/messagebus/routing/routingtable.cpp index 58e1881dc90..7537605d9fa 100644 --- a/messagebus/src/vespa/messagebus/routing/routingtable.cpp +++ b/messagebus/src/vespa/messagebus/routing/routingtable.cpp @@ -54,7 +54,7 @@ const HopBlueprint * RoutingTable::getHop(const string &name) const { std::map<string, HopBlueprint>::const_iterator it = _hops.find(name); - return it != _hops.end() ? &(it->second) : nullptr; + return it != _hops.end() ? &(it->second) : NULL; } bool @@ -67,7 +67,7 @@ const Route * RoutingTable::getRoute(const string &name) const { std::map<string, Route>::const_iterator it = _routes.find(name); - return it != _routes.end() ? &(it->second) : nullptr; + return it != _routes.end() ? &(it->second) : NULL; } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp index 103c21ee3aa..c093efd1106 100644 --- a/messagebus/src/vespa/messagebus/rpcmessagebus.cpp +++ b/messagebus/src/vespa/messagebus/rpcmessagebus.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "rpcmessagebus.h" -#include <vespa/config/subscription/configuri.h> namespace mbus { @@ -16,10 +15,6 @@ RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams, _subscriber.start(); } -RPCMessageBus::RPCMessageBus(const MessageBusParams &mbusParams, const RPCNetworkParams &rpcParams) - : RPCMessageBus(mbusParams, rpcParams, config::ConfigUri("client")) -{} - RPCMessageBus::RPCMessageBus(const ProtocolSet &protocols, const RPCNetworkParams &rpcParams, const config::ConfigUri &routingCfgUri) : diff --git a/messagebus/src/vespa/messagebus/rpcmessagebus.h b/messagebus/src/vespa/messagebus/rpcmessagebus.h index e9cfed32c44..3f249965fec 100644 --- a/messagebus/src/vespa/messagebus/rpcmessagebus.h +++ b/messagebus/src/vespa/messagebus/rpcmessagebus.h @@ -7,8 +7,6 @@ #include <vespa/messagebus/network/rpcnetwork.h> #include <vespa/config/helper/legacysubscriber.h> -namespace config {class ConfigUri; } - namespace mbus { /** @@ -42,10 +40,8 @@ public: * @param routingCfgId The config id for message bus routing specs. */ RPCMessageBus(const MessageBusParams &mbusParams, - const RPCNetworkParams &rpcParams, - const config::ConfigUri & routingCfgId); - RPCMessageBus(const MessageBusParams &mbusParams, - const RPCNetworkParams &rpcParams); + const RPCNetworkParams &rpcParams = RPCNetworkParams(), + const config::ConfigUri & routingCfgId = config::ConfigUri("client")); /** @@ -59,8 +55,8 @@ public: * @param routingCfgId The config id for messagebus routing specs. */ RPCMessageBus(const ProtocolSet &protocols, - const RPCNetworkParams &rpcParams, - const config::ConfigUri & routingCfgId); + const RPCNetworkParams &rpcParams = RPCNetworkParams(), + const config::ConfigUri & routingCfgId = config::ConfigUri("client")); /** * Destruct. This will destruct the internal MessageBus and RPCNetwork diff --git a/messagebus/src/vespa/messagebus/sequencer.cpp b/messagebus/src/vespa/messagebus/sequencer.cpp index 60fb3bdd39e..79fbd346c16 100644 --- a/messagebus/src/vespa/messagebus/sequencer.cpp +++ b/messagebus/src/vespa/messagebus/sequencer.cpp @@ -19,7 +19,7 @@ Sequencer::~Sequencer() { for (QueueMap::iterator it = _seqMap.begin(); it != _seqMap.end(); ++it) { MessageQueue *queue = it->second; - if (queue != nullptr) { + if (queue != NULL) { while (queue->size() > 0) { Message *msg = queue->front(); queue->pop(); @@ -40,7 +40,7 @@ Sequencer::filter(Message::UP msg) vespalib::LockGuard guard(_lock); QueueMap::iterator it = _seqMap.find(seqId); if (it != _seqMap.end()) { - if (it->second == nullptr) { + if (it->second == NULL) { it->second = new MessageQueue(); } msg->getTrace().trace(TraceLevel::COMPONENT, @@ -49,7 +49,7 @@ Sequencer::filter(Message::UP msg) msg.release(); return Message::UP(); } - _seqMap[seqId] = nullptr; // insert empty queue + _seqMap[seqId] = NULL; // insert empty queue } return std::move(msg); } @@ -69,7 +69,7 @@ Sequencer::handleMessage(Message::UP msg) { if (msg->hasSequenceId()) { msg = filter(std::move(msg)); - if (msg.get() != nullptr) { + if (msg.get() != NULL) { sequencedSend(std::move(msg)); } } else { @@ -89,8 +89,8 @@ Sequencer::handleReply(Reply::UP reply) QueueMap::iterator it = _seqMap.find(seq); MessageQueue *que = it->second; assert(it != _seqMap.end()); - if (que == nullptr || que->size() == 0) { - if (que != nullptr) { + if (que == NULL || que->size() == 0) { + if (que != NULL) { delete que; } _seqMap.erase(it); @@ -99,7 +99,7 @@ Sequencer::handleReply(Reply::UP reply) que->pop(); } } - if (msg.get() != nullptr) { + if (msg.get() != NULL) { sequencedSend(std::move(msg)); } IReplyHandler &handler = reply->getCallStack().pop(*reply); diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 1dbdd307e17..9a93a4aedf1 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -41,9 +41,9 @@ SourceSession::send(Message::UP msg, const string &routeName, bool parseIfNotFou { bool found = false; RoutingTable::SP rt = _mbus.getRoutingTable(msg->getProtocol()); - if (rt.get() != nullptr) { + if (rt.get() != NULL) { const Route *route = rt->getRoute(routeName); - if (route != nullptr) { + if (route != NULL) { msg->setRoute(*route); found = true; } else if (!parseIfNotFound) { @@ -79,13 +79,13 @@ SourceSession::send(Message::UP msg) if (_closed) { return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg)); } - if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) { + if (_throttlePolicy.get() != NULL && !_throttlePolicy->canSend(*msg, _pendingCount)) { return Result(Error(ErrorCode::SEND_QUEUE_FULL, make_string("Too much pending data (%d messages).", _pendingCount)), std::move(msg)); } msg->pushHandler(_replyHandler); - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy.get() != NULL) { _throttlePolicy->processMessage(*msg); } ++_pendingCount; @@ -108,7 +108,7 @@ SourceSession::handleReply(Reply::UP reply) vespalib::MonitorGuard guard(_monitor); assert(_pendingCount > 0); --_pendingCount; - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy.get() != NULL) { _throttlePolicy->processReply(*reply); } done = (_closed && _pendingCount == 0); diff --git a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp index 51fe91562ae..125b2a9822f 100644 --- a/messagebus/src/vespa/messagebus/sourcesessionparams.cpp +++ b/messagebus/src/vespa/messagebus/sourcesessionparams.cpp @@ -6,7 +6,7 @@ namespace mbus { SourceSessionParams::SourceSessionParams() : - _replyHandler(nullptr), + _replyHandler(NULL), _throttlePolicy(new DynamicThrottlePolicy()), _timeout(180.0) { } @@ -40,7 +40,7 @@ SourceSessionParams::setTimeout(double timeout) bool SourceSessionParams::hasReplyHandler() const { - return _replyHandler != nullptr; + return _replyHandler != NULL; } IReplyHandler & diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.cpp b/messagebus/src/vespa/messagebus/testlib/testserver.cpp index dbc741f2dd4..e7f3646c72c 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.cpp +++ b/messagebus/src/vespa/messagebus/testlib/testserver.cpp @@ -4,7 +4,6 @@ #include "simpleprotocol.h" #include "slobrok.h" #include "slobrokstate.h" -#include <vespa/messagebus/network/oosmanager.h> #include <vespa/vespalib/component/vtag.h> namespace mbus { @@ -12,7 +11,9 @@ namespace mbus { VersionedRPCNetwork::VersionedRPCNetwork(const RPCNetworkParams ¶ms) : RPCNetwork(params), _version(vespalib::Vtag::currentVersion) -{} +{ + // empty +} void VersionedRPCNetwork::setVersion(const vespalib::Version &version) @@ -96,4 +97,4 @@ TestServer::waitState(const OOSState &oosState) return false; } -} +} // namespace mbus diff --git a/messagebus/src/vespa/messagebus/testlib/testserver.h b/messagebus/src/vespa/messagebus/testlib/testserver.h index 400e2b274c5..1e2de3c4607 100644 --- a/messagebus/src/vespa/messagebus/testlib/testserver.h +++ b/messagebus/src/vespa/messagebus/testlib/testserver.h @@ -2,10 +2,9 @@ #pragma once +#include <memory> #include <vespa/messagebus/messagebus.h> #include <vespa/messagebus/network/rpcnetwork.h> -#include <vespa/messagebus/network/rpcnetworkparams.h> -#include <vespa/fnet/frt/supervisor.h> namespace mbus { diff --git a/messagebus/src/vespa/messagebus/tracenode.h b/messagebus/src/vespa/messagebus/tracenode.h index f582a70a151..95de9e70f55 100644 --- a/messagebus/src/vespa/messagebus/tracenode.h +++ b/messagebus/src/vespa/messagebus/tracenode.h @@ -5,7 +5,7 @@ namespace mbus { - using TraceNode = vespalib::TraceNode; + typedef vespalib::TraceNode TraceNode; } // namespace mbus |