summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-02-05 13:19:16 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-02-05 13:51:20 +0000
commitaee4bb48d6afd708dad62a10cc58c72d28b6d81c (patch)
tree278fc7235b217c8b61147a7909ff57d89002db04 /messagebus
parentd4ffb1cb379d3212f9d7a2ed7bd10cae9e5152f9 (diff)
Augment messagebus error messages with recipients and/or local identity
Remove deprecated C++ implementation fallback to 4.1 protocol version if remote RPC endpoint does not understand `mbus.getVersion` call. Will now fail as expected with a handshake error instead.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java25
-rwxr-xr-xmessagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp45
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h2
-rw-r--r--messagebus/src/vespa/messagebus/routing/routingnode.h4
7 files changed, 69 insertions, 15 deletions
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
index c169bed3fa9..ab741b36a05 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/network/rpc/RPCNetwork.java
@@ -45,6 +45,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
+import java.util.stream.Collectors;
/**
* An RPC implementation of the Network interface.
@@ -232,6 +233,16 @@ public class RPCNetwork implements Network, MethodHandler {
}
}
+ private static String buildRecipientListString(SendContext ctx) {
+ return ctx.recipients.stream().map(r -> {
+ if (!(r.getServiceAddress() instanceof RPCServiceAddress)) {
+ return "<non-RPC service address>";
+ }
+ RPCServiceAddress addr = (RPCServiceAddress)r.getServiceAddress();
+ return String.format("%s at %s", addr.getServiceName(), addr.getConnectionSpec());
+ }).collect(Collectors.joining(", "));
+ }
+
/**
* This method is a callback invoked after {@link #send(Message, List)} once the version of all recipients have been
* resolved. If all versions were resolved ahead of time, this method is invoked by the same thread as the former.
@@ -243,7 +254,9 @@ public class RPCNetwork implements Network, MethodHandler {
if (destroyed.get()) {
replyError(ctx, ErrorCode.NETWORK_SHUTDOWN, "Network layer has performed shutdown.");
} else if (ctx.hasError) {
- replyError(ctx, ErrorCode.HANDSHAKE_FAILED, "An error occured while resolving version.");
+ replyError(ctx, ErrorCode.HANDSHAKE_FAILED,
+ String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
+ buildRecipientListString(ctx), identity.getHostname()));
} else {
executor.execute(new SendTask(owner.getProtocol(ctx.msg.getProtocol()), ctx));
}
@@ -295,14 +308,16 @@ public class RPCNetwork implements Network, MethodHandler {
RPCServiceAddress ret = servicePool.resolve(serviceName);
if (ret == null) {
return new Error(ErrorCode.NO_ADDRESS_FOR_SERVICE,
- "The address of service '" + serviceName + "' could not be resolved. It is not currently " +
- "registered with the Vespa name server. " +
- "The service must be having problems, or the routing configuration is wrong.");
+ String.format("The address of service '%s' could not be resolved. It is not currently " +
+ "registered with the Vespa name server. " +
+ "The service must be having problems, or the routing configuration is wrong. " +
+ "Address resolution attempted from host '%s'", serviceName, identity.getHostname()));
}
RPCTarget target = targetPool.getTarget(orb, ret);
if (target == null) {
return new Error(ErrorCode.CONNECTION_ERROR,
- "Failed to connect to service '" + serviceName + "'.");
+ String.format("Failed to connect to service '%s' from host '%s'.",
+ serviceName, identity.getHostname()));
}
ret.setTarget(target); // free by freeServiceAddress()
recipient.setServiceAddress(ret);
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
index d2d8031a689..64b7c4cb12e 100755
--- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingNode.java
@@ -183,7 +183,7 @@ public class RoutingNode implements ReplyHandler {
}
/**
- * This method markss this node as ready for merge. If it has a parent routing node, its pending member is
+ * This method marks this node as ready for merge. If it has a parent routing node, its pending member is
* decremented. If this causes the parent's pending count to reach zero, its {@link #notifyMerge()} method is
* invoked. A special flag is used to make sure that failed resending avoids notifying parents of previously
* resolved branches of the tree.
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index b72416f51d2..358c9ebdeac 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -13,6 +13,7 @@
#include <vespa/slobrok/sbregister.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/vespalib/component/vtag.h>
+#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fnet/scheduler.h>
@@ -296,13 +297,15 @@ RPCNetwork::resolveServiceAddress(RoutingNode &recipient, const string &serviceN
return Error(ErrorCode::NO_ADDRESS_FOR_SERVICE,
make_string("The address of service '%s' could not be resolved. It is not currently "
"registered with the Vespa name server. "
- "The service must be having problems, or the routing configuration is wrong.",
- serviceName.c_str()));
+ "The service must be having problems, or the routing configuration is wrong. "
+ "Address resolution attempted from host '%s'",
+ serviceName.c_str(), getIdentity().getHostname().c_str()));
}
RPCTarget::SP target = _targetPool->getTarget(*_orb, *ret);
if ( ! target) {
return Error(ErrorCode::CONNECTION_ERROR,
- make_string("Failed to connect to service '%s'.", serviceName.c_str()));
+ make_string("Failed to connect to service '%s' from host '%s'.",
+ serviceName.c_str(), getIdentity().getHostname().c_str()));
}
ret->setTarget(target); // free by freeServiceAddress()
recipient.setServiceAddress(IServiceAddress::UP(ret.release()));
@@ -330,11 +333,45 @@ RPCNetwork::send(const Message &msg, const std::vector<RoutingNode*> &recipients
}
}
+namespace {
+
+void emit_recipient_endpoint(vespalib::asciistream& stream, const RoutingNode& recipient) {
+ if (recipient.hasServiceAddress()) {
+ // At this point the service addresses _should_ be RPCServiceAddress instances,
+ // but stay on the safe side of the tracks anyway.
+ const auto* rpc_addr = dynamic_cast<const RPCServiceAddress*>(&recipient.getServiceAddress());
+ if (rpc_addr) {
+ stream << rpc_addr->getServiceName() << " at " << rpc_addr->getConnectionSpec();
+ } else {
+ stream << "<non-RPC service address>";
+ }
+ } else {
+ stream << "<unknown service address>";
+ }
+}
+
+}
+
+vespalib::string RPCNetwork::buildRecipientListString(const SendContext& ctx) {
+ vespalib::asciistream s;
+ bool first = true;
+ for (const auto* recipient : ctx._recipients) {
+ if (!first) {
+ s << ", ";
+ }
+ first = false;
+ emit_recipient_endpoint(s, *recipient);
+ }
+ return s.str();
+}
+
void
RPCNetwork::send(RPCNetwork::SendContext &ctx)
{
if (ctx._hasError) {
- replyError(ctx, ErrorCode::HANDSHAKE_FAILED, "An error occured while resolving version.");
+ replyError(ctx, ErrorCode::HANDSHAKE_FAILED,
+ make_string("An error occurred while resolving version of recipient(s) [%s] from host '%s'.",
+ buildRecipientListString(ctx).c_str(), getIdentity().getHostname().c_str()));
} else {
uint64_t timeRemaining = ctx._msg.getTimeRemainingNow();
Blob payload = _owner->getProtocol(ctx._msg.getProtocol())->encode(ctx._version, ctx._msg);
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 9c6516eced7..123987fea68 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -105,6 +105,8 @@ private:
*/
void send(SendContext &ctx);
+ static vespalib::string buildRecipientListString(const SendContext& ctx);
+
protected:
/**
* Returns the version of this network. This gets called when the
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index a1c80fdc475..04423b5e90b 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -74,12 +74,12 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
if (req->CheckReturnTypes("s")) {
FRT_Values &val = *req->GetReturn();
try {
- _version.reset(new vespalib::Version(val[0]._string._str));
+ _version = std::make_unique<vespalib::Version>(val[0]._string._str);
} catch (vespalib::IllegalArgumentException &e) {
(void)e;
}
} else if (req->GetErrorCode() == FRTE_RPC_NO_SUCH_METHOD) {
- _version.reset(new vespalib::Version("4.1"));
+ // Talking to a non-messagebus RPC endpoint. _version remains nullptr.
}
_versionHandlers.swap(handlers);
_state = PROCESSING_HANDLERS;
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index 706d994ceaf..cd1028d6df4 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -31,7 +31,7 @@ public:
/**
* This method is invoked once the version of the corresponding {@link
- * RPCTarget} becomes available. If a problem occured while retrieving
+ * RPCTarget} becomes available. If a problem occurred while retrieving
* the version, this method is invoked with a null argument.
*
* @param ver The version of corresponding target, or null.
diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.h b/messagebus/src/vespa/messagebus/routing/routingnode.h
index dfcd30d06bc..3902e4ef699 100644
--- a/messagebus/src/vespa/messagebus/routing/routingnode.h
+++ b/messagebus/src/vespa/messagebus/routing/routingnode.h
@@ -74,7 +74,7 @@ private:
/**
* This method merges the content of all its children, and invokes itself on
- * the parent node. If not all children are ready for merg, this method does
+ * the parent node. If not all children are ready for merge, this method does
* nothing. The rationale for this is that the last child to receive a reply
* will propagate the merge upwards. Once this method reaches the root node,
* the reply is either scheduled for resending or passed to the owning reply
@@ -421,7 +421,7 @@ public:
*
* @return True if an address is set.
*/
- bool hasServiceAddress() { return _serviceAddress.get() != nullptr; }
+ bool hasServiceAddress() const { return _serviceAddress.get() != nullptr; }
/**
* Returns the service address of this node. This is attached by the network