diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-02-05 13:19:16 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-02-05 13:51:20 +0000 |
commit | aee4bb48d6afd708dad62a10cc58c72d28b6d81c (patch) | |
tree | 278fc7235b217c8b61147a7909ff57d89002db04 /messagebus | |
parent | d4ffb1cb379d3212f9d7a2ed7bd10cae9e5152f9 (diff) |
Augment messagebus error messages with recipients and/or local identity
Remove deprecated C++ implementation fallback to 4.1 protocol version if
remote RPC endpoint does not understand `mbus.getVersion` call. Will now
fail as expected with a handshake error instead.
Diffstat (limited to 'messagebus')
7 files changed, 69 insertions, 15 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java index c169bed3fa9..ab741b36a05 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java @@ -45,6 +45,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * An RPC implementation of the Network interface. @@ -232,6 +233,16 @@ public class RPCNetwork implements Network, MethodHandler { } } + private static String buildRecipientListString(SendContext ctx) { + return ctx.recipients.stream().map(r -> { + if (!(r.getServiceAddress() instanceof RPCServiceAddress)) { + return "<non-RPC service address>"; + } + RPCServiceAddress addr = (RPCServiceAddress)r.getServiceAddress(); + return String.format("%s at %s", addr.getServiceName(), addr.getConnectionSpec()); + }).collect(Collectors.joining(", ")); + } + /** * This method is a callback invoked after {@link #send(Message, List)} once the version of all recipients have been * resolved. If all versions were resolved ahead of time, this method is invoked by the same thread as the former. @@ -243,7 +254,9 @@ public class RPCNetwork implements Network, MethodHandler { if (destroyed.get()) { replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown."); } else if (ctx.hasError) { - replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version."); + replyError(ctx, ErrorCode.HANDSHAKE_FAILED, + String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", + buildRecipientListString(ctx), identity.getHostname())); } else { executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx)); } @@ -295,14 +308,16 @@ public class RPCNetwork implements Network, MethodHandler { RPCServiceAddress ret = servicePool.resolve(serviceName); if (ret == null) { return new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE, - "The address of service '" + serviceName + "' could not be resolved. It is not currently " + - "registered with the Vespa name server. " + - "The service must be having problems, or the routing configuration is wrong."); + String.format("The address of service '%s' could not be resolved. It is not currently " + + "registered with the Vespa name server. " + + "The service must be having problems, or the routing configuration is wrong. " + + "Address resolution attempted from host '%s'", serviceName, identity.getHostname())); } RPCTarget target = targetPool.getTarget(orb, ret); if (target == null) { return new Error(ErrorCode.CONNECTION_ERROR, - "Failed to connect to service '" + serviceName + "'."); + String.format("Failed to connect to service '%s' from host '%s'.", + serviceName, identity.getHostname())); } ret.setTarget(target); // free by freeServiceAddress() recipient.setServiceAddress(ret); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java index d2d8031a689..64b7c4cb12e 100755 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java @@ -183,7 +183,7 @@ public class RoutingNode implements ReplyHandler { } /** - * This method markss this node as ready for merge. If it has a parent routing node, its pending member is + * This method marks this node as ready for merge. If it has a parent routing node, its pending member is * decremented. If this causes the parent's pending count to reach zero, its {@link #notifyMerge()} method is * invoked. A special flag is used to make sure that failed resending avoids notifying parents of previously * resolved branches of the tree. diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index b72416f51d2..358c9ebdeac 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -13,6 +13,7 @@ #include <vespa/slobrok/sbregister.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/vespalib/component/vtag.h> +#include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/fnet/scheduler.h> @@ -296,13 +297,15 @@ RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceN return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE, make_string("The address of service '%s' could not be resolved. It is not currently " "registered with the Vespa name server. " - "The service must be having problems, or the routing configuration is wrong.", - serviceName.c_str())); + "The service must be having problems, or the routing configuration is wrong. " + "Address resolution attempted from host '%s'", + serviceName.c_str(), getIdentity().getHostname().c_str())); } RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret); if ( ! target) { return Error(ErrorCode::CONNECTION_ERROR, - make_string("Failed to connect to service '%s'.", serviceName.c_str())); + make_string("Failed to connect to service '%s' from host '%s'.", + serviceName.c_str(), getIdentity().getHostname().c_str())); } ret->setTarget(target); // free by freeServiceAddress() recipient.setServiceAddress(IServiceAddress::UP(ret.release())); @@ -330,11 +333,45 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients } } +namespace { + +void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) { + if (recipient.hasServiceAddress()) { + // At this point the service addresses _should_ be RPCServiceAddress instances, + // but stay on the safe side of the tracks anyway. + const auto* rpc_addr = dynamic_cast<const RPCServiceAddress*>(&recipient.getServiceAddress()); + if (rpc_addr) { + stream << rpc_addr->getServiceName() << " at " << rpc_addr->getConnectionSpec(); + } else { + stream << "<non-RPC service address>"; + } + } else { + stream << "<unknown service address>"; + } +} + +} + +vespalib::string RPCNetwork::buildRecipientListString(const SendContext& ctx) { + vespalib::asciistream s; + bool first = true; + for (const auto* recipient : ctx._recipients) { + if (!first) { + s << ", "; + } + first = false; + emit_recipient_endpoint(s, *recipient); + } + return s.str(); +} + void RPCNetwork::send(RPCNetwork::SendContext &ctx) { if (ctx._hasError) { - replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version."); + replyError(ctx, ErrorCode::HANDSHAKE_FAILED, + make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", + buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str())); } else { uint64_t timeRemaining = ctx._msg.getTimeRemainingNow(); Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg); diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 9c6516eced7..123987fea68 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -105,6 +105,8 @@ private: */ void send(SendContext &ctx); + static vespalib::string buildRecipientListString(const SendContext& ctx); + protected: /** * Returns the version of this network. This gets called when the diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index a1c80fdc475..04423b5e90b 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -74,12 +74,12 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) if (req->CheckReturnTypes("s")) { FRT_Values &val = *req->GetReturn(); try { - _version.reset(new vespalib::Version(val[0]._string._str)); + _version = std::make_unique<vespalib::Version>(val[0]._string._str); } catch (vespalib::IllegalArgumentException &e) { (void)e; } } else if (req->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) { - _version.reset(new vespalib::Version("4.1")); + // Talking to a non-messagebus RPC endpoint. _version remains nullptr. } _versionHandlers.swap(handlers); _state = PROCESSING_HANDLERS; diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index 706d994ceaf..cd1028d6df4 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -31,7 +31,7 @@ public: /** * This method is invoked once the version of the corresponding {@link - * RPCTarget} becomes available. If a problem occured while retrieving + * RPCTarget} becomes available. If a problem occurred while retrieving * the version, this method is invoked with a null argument. * * @param ver The version of corresponding target, or null. diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h index dfcd30d06bc..3902e4ef699 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.h +++ b/messagebus/src/vespa/messagebus/routing/routingnode.h @@ -74,7 +74,7 @@ private: /** * This method merges the content of all its children, and invokes itself on - * the parent node. If not all children are ready for merg, this method does + * the parent node. If not all children are ready for merge, this method does * nothing. The rationale for this is that the last child to receive a reply * will propagate the merge upwards. Once this method reaches the root node, * the reply is either scheduled for resending or passed to the owning reply @@ -421,7 +421,7 @@ public: * * @return True if an address is set. */ - bool hasServiceAddress() { return _serviceAddress.get() != nullptr; } + bool hasServiceAddress() const { return _serviceAddress.get() != nullptr; } /** * Returns the service address of this node. This is attached by the network |