summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-08-25 09:00:41 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-08-25 09:00:41 +0000
commit3bdb9965fd11b1e7af4de05814c76f5779831047 (patch)
tree74038df024bf2b4ddd7923861e52c9b9776a09f6 /messagebus
parent216316446052bad491f74922f90254f33820aba3 (diff)
Wire RPC capability set filtering to mbus server functions
Required capability set is configured via `RPCNetworkParams` and defaults to the empty set (i.e. no filtering done).
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h11
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendadapter.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsendv2.h2
9 files changed, 34 insertions, 10 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 412dc29d4f2..6716ee5fe24 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -4,6 +4,7 @@
#include "rpcsendv2.h"
#include "rpctargetpool.h"
#include "rpcnetworkparams.h"
+#include <vespa/fnet/frt/require_capabilities.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
@@ -138,7 +139,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
- _compressionConfig(params.getCompressionConfig())
+ _compressionConfig(params.getCompressionConfig()),
+ _required_capabilities(params.required_capabilities())
{
}
@@ -191,13 +193,14 @@ RPCNetwork::attach(INetworkOwner &owner)
LOG_ASSERT(_owner == nullptr);
_owner = &owner;
- _sendV2->attach(*this);
+ _sendV2->attach(*this, _required_capabilities);
_sendAdapters[vespalib::Version(6, 149)] = _sendV2.get();
FRT_ReflectionBuilder builder(_orb.get());
builder.DefineMethod("mbus.getVersion", "", "s", FRT_METHOD(RPCNetwork::invoke), this);
builder.MethodDesc("Retrieves the message bus version.");
builder.ReturnDesc("version", "The message bus version.");
+ builder.RequestAccessFilter(FRT_RequireCapabilities::of(_required_capabilities));
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index b95c0c77b3c..0d2435e5dcd 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -11,6 +11,7 @@
#include <vespa/messagebus/reply.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/vespalib/component/versionspecification.h>
+#include <vespa/vespalib/net/tls/capability_set.h>
#include <vespa/vespalib/util/compressionconfig.h>
#include <vespa/fnet/frt/invokable.h>
@@ -36,6 +37,7 @@ class RPCServiceAddress;
class RPCNetwork : public FRT_Invokable, public INetwork {
private:
using CompressionConfig = vespalib::compression::CompressionConfig;
+ using CapabilitySet = vespalib::net::tls::CapabilitySet;
struct SendContext : public RPCTarget::IVersionHandler {
std::mutex _lock;
RPCNetwork &_net;
@@ -68,6 +70,7 @@ private:
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
CompressionConfig _compressionConfig;
+ CapabilitySet _required_capabilities;
/**
* Resolves and assigns a service address for the given recipient using the
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index fc060b48fc2..a0ed8d6742b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -20,7 +20,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_events_before_wakeup(1),
_tcpNoDelay(true),
_connectionExpireSecs(600),
- _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
+ _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024),
+ _required_capabilities(CapabilitySet::make_empty()) // No special peer requirements by default
{ }
RPCNetworkParams::~RPCNetworkParams() = default;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 4a4d92ba797..6b666bf13fb 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -3,6 +3,7 @@
#include "identity.h"
#include <vespa/slobrok/cfg.h>
+#include <vespa/vespalib/net/tls/capability_set.h>
#include <vespa/vespalib/util/compressionconfig.h>
namespace mbus {
@@ -14,6 +15,7 @@ namespace mbus {
class RPCNetworkParams {
private:
using CompressionConfig = vespalib::compression::CompressionConfig;
+ using CapabilitySet = vespalib::net::tls::CapabilitySet;
Identity _identity;
config::ConfigUri _slobrokConfig;
int _listenPort;
@@ -25,6 +27,7 @@ private:
bool _tcpNoDelay;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
+ CapabilitySet _required_capabilities;
public:
RPCNetworkParams();
@@ -166,6 +169,14 @@ public:
return *this;
}
uint32_t events_before_wakeup() const { return _events_before_wakeup; }
+
+ RPCNetworkParams& required_capabilities(CapabilitySet capabilities) noexcept {
+ _required_capabilities = capabilities;
+ return *this;
+ }
+ [[nodiscard]] CapabilitySet required_capabilities() const noexcept {
+ return _required_capabilities;
+ }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index ff77a1bb639..8c67424d5f2 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -64,7 +64,7 @@ RPCSend::RPCSend()
RPCSend::~RPCSend() = default;
void
-RPCSend::attach(RPCNetwork &net)
+RPCSend::attach(RPCNetwork &net, CapabilitySet required_capabilities)
{
_net = &net;
const string &prefix = _net->getIdentity().getServicePrefix();
@@ -74,7 +74,7 @@ RPCSend::attach(RPCNetwork &net)
}
FRT_ReflectionBuilder builder(&_net->getSupervisor());
- build(builder);
+ build(builder, required_capabilities);
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index c2bcb7dff2b..df3d5972318 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -54,7 +54,7 @@ protected:
string _clientIdent;
string _serverIdent;
- virtual void build(FRT_ReflectionBuilder & builder) = 0;
+ virtual void build(FRT_ReflectionBuilder & builder, CapabilitySet required_capabilities) = 0;
virtual std::unique_ptr<Reply> createReply(const FRT_Values & response, const string & serviceName,
Error & error, vespalib::Trace & trace) const = 0;
virtual void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,
@@ -86,7 +86,7 @@ private:
void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
void doHandleReply(std::unique_ptr<Reply> reply);
- void attach(RPCNetwork &net) final override;
+ void attach(RPCNetwork &net, CapabilitySet required_capabilities) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
Blob payload, duration timeRemaining) final override;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
index 439049c54a8..0c0f0585ab9 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendadapter.h
@@ -4,6 +4,7 @@
#include <vespa/messagebus/blobref.h>
#include <vespa/messagebus/common.h>
#include <vespa/vespalib/component/version.h>
+#include <vespa/vespalib/net/tls/capability_set.h>
namespace mbus {
@@ -20,6 +21,8 @@ class RPCSendAdapter
protected:
RPCSendAdapter() = default;
public:
+ using CapabilitySet = vespalib::net::tls::CapabilitySet;
+
RPCSendAdapter(const RPCSendAdapter &) = delete;
RPCSendAdapter & operator = (const RPCSendAdapter &) = delete;
/**
@@ -31,8 +34,9 @@ public:
* Attaches this adapter to the given network.
*
* @param net The network to attach to.
+ * @param required_capabilities capabilities required to invoke mbus on this server
*/
- virtual void attach(RPCNetwork &net) = 0;
+ virtual void attach(RPCNetwork &net, CapabilitySet required_capabilities) = 0;
/**
* Performs the actual sending to the given recipient.
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
index ead982d2c19..c211ff83110 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.cpp
@@ -4,6 +4,7 @@
#include "rpcnetwork.h"
#include "rpcserviceaddress.h"
#include <vespa/fnet/frt/reflection.h>
+#include <vespa/fnet/frt/require_capabilities.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/error.h>
#include <vespa/vespalib/data/databuffer.h>
@@ -59,7 +60,7 @@ RPCSendV2::isCompatible(stringref method, stringref request, stringref response)
}
void
-RPCSendV2::build(FRT_ReflectionBuilder & builder)
+RPCSendV2::build(FRT_ReflectionBuilder & builder, CapabilitySet required_capabilities)
{
builder.DefineMethod(METHOD_NAME, METHOD_PARAMS, METHOD_RETURN, FRT_METHOD(RPCSendV2::invoke), this);
builder.MethodDesc("Send a message bus slime request and get a reply back.");
@@ -75,6 +76,7 @@ RPCSendV2::build(FRT_ReflectionBuilder & builder)
builder.ReturnDesc("body_encoding", "0=raw, 6=lz4");
builder.ReturnDesc("body_decoded_size", "Uncompressed body blob size");
builder.ReturnDesc("body_payload", "The reply body blob in slime.");
+ builder.RequestAccessFilter(FRT_RequireCapabilities::of(required_capabilities));
}
const char *
diff --git a/messagebus/src/vespa/messagebus/network/rpcsendv2.h b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
index 1222130c1c8..c9efa7d8d06 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsendv2.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsendv2.h
@@ -9,7 +9,7 @@ class RPCSendV2 : public RPCSend {
public:
static bool isCompatible(vespalib::stringref method, vespalib::stringref request, vespalib::stringref response);
private:
- void build(FRT_ReflectionBuilder & builder) override;
+ void build(FRT_ReflectionBuilder & builder, CapabilitySet required_capabilities) override;
const char * getReturnSpec() const override;
std::unique_ptr<Params> toParams(const FRT_Values &param) const override;
void encodeRequest(FRT_RPCRequest &req, const vespalib::Version &version, const Route & route,