aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-06-26 20:29:36 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-06-26 20:29:36 +0000
commit9cd0f66be8245318198588a1b1dec9bfe917b49e (patch)
treeb31aa3641f5167908f2bd51dce6ce99eb9549158 /messagebus
parentb659a529c013d6ff0ac21c0a54f49d6b38dc5c67 (diff)
Remove 'requireSequencing' as an option.
As a consequence move protocol and params resolution to common code so that it is always handled in the decoding/encoding thread.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/protocolrepository/protocolrepository.cpp41
-rw-r--r--messagebus/src/vespa/messagebus/iprotocol.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp48
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h4
-rw-r--r--messagebus/src/vespa/messagebus/testlib/simpleprotocol.h1
5 files changed, 37 insertions, 60 deletions
diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
index a9a268262ee..20649d56d13 100644
--- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp
+++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp
@@ -13,42 +13,23 @@ private:
public:
- TestProtocol(const string &name)
+ TestProtocol(const string &name) noexcept
: _name(name)
- {
- // empty
- }
+ { }
- const string &
- getName() const override
- {
- return _name;
- }
+ const string & getName() const override { return _name; }
- IRoutingPolicy::UP
- createPolicy(const string &name, const string &param) const override
- {
- (void)name;
- (void)param;
+ IRoutingPolicy::UP createPolicy(const string &, const string &) const override {
throw std::exception();
}
- Blob
- encode(const vespalib::Version &version, const Routable &routable) const override
- {
- (void)version;
- (void)routable;
+ Blob encode(const vespalib::Version &, const Routable &) const override {
throw std::exception();
}
- Routable::UP
- decode(const vespalib::Version &version, BlobRef data) const override
- {
- (void)version;
- (void)data;
+ Routable::UP decode(const vespalib::Version &, BlobRef ) const override {
throw std::exception();
}
- bool requireSequencing() const override { return false; }
};
int
@@ -58,16 +39,16 @@ Test::Main()
ProtocolRepository repo;
IProtocol::SP prev;
- prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo")));
- ASSERT_TRUE(prev.get() == NULL);
+ prev = repo.putProtocol(std::make_shared<TestProtocol>("foo"));
+ ASSERT_FALSE(prev);
IRoutingPolicy::SP policy = repo.getRoutingPolicy("foo", "bar", "baz");
- prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo")));
- ASSERT_TRUE(prev.get() != NULL);
+ prev = repo.putProtocol(std::make_shared<TestProtocol>("foo"));
+ ASSERT_TRUE(prev);
ASSERT_NOT_EQUAL(prev.get(), repo.getProtocol("foo"));
policy = repo.getRoutingPolicy("foo", "bar", "baz");
- ASSERT_TRUE(policy.get() == NULL);
+ ASSERT_FALSE(policy);
TEST_DONE();
}
diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h
index 20726ced71b..e46a5600471 100644
--- a/messagebus/src/vespa/messagebus/iprotocol.h
+++ b/messagebus/src/vespa/messagebus/iprotocol.h
@@ -79,9 +79,6 @@ public:
* @return The decoded routable.
*/
virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw()
-
-
- virtual bool requireSequencing() const = 0;
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 485c8e3e911..7627aa876b3 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -55,10 +55,10 @@ private:
}
-RPCSend::RPCSend() :
- _net(nullptr),
- _clientIdent("client"),
- _serverIdent("server")
+RPCSend::RPCSend()
+ : _net(nullptr),
+ _clientIdent("client"),
+ _serverIdent("server")
{ }
RPCSend::~RPCSend() = default;
@@ -220,19 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
- const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
+ if (!_net->allowDispatchForEncode()) {
+ doHandleReply(std::move(reply));
} else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
- doHandleReply(protocol, std::move(reply));
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable {
+ doHandleReply(std::move(reply));
}));
assert (!rejected);
}
}
void
-RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) {
+RPCSend::doHandleReply(Reply::UP reply) {
+ const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR));
FRT_RPCRequest &req = ctx->getRequest();
string version = ctx->getVersion().toString();
@@ -256,29 +256,29 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
- FRT_Values &args = *req->GetParams();
- std::unique_ptr<Params> params = toParams(args);
- IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
- if (protocol == nullptr) {
- replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
- vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
- return;
- }
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
+ if (!_net->allowDispatchForDecode()) {
+ doRequest(req);
} else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
- doRequest(req, protocol, std::move(params));
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() {
+ doRequest(req);
}));
assert (!rejected);
}
}
void
-RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params)
+RPCSend::doRequest(FRT_RPCRequest *req)
{
+ FRT_Values &args = *req->GetParams();
+ std::unique_ptr<Params> params = toParams(args);
+ IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
+ if (protocol == nullptr) {
+ replyError(req, params->getVersion(), params->getTraceLevel(),
+ Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
+ vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
+ return;
+ }
Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
req->DiscardBlobs();
if ( ! routable ) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index e548c4adca2..c2bcb7dff2b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -83,9 +83,9 @@ public:
void invoke(FRT_RPCRequest *req);
private:
- void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params);
+ void doRequest(FRT_RPCRequest *req);
void doRequestDone(FRT_RPCRequest *req);
- void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply);
+ void doHandleReply(std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,
diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
index 57a6e901b90..0486f058217 100644
--- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
+++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h
@@ -72,7 +72,6 @@ public:
IRoutingPolicy::UP createPolicy(const string &name, const string &param) const override;
Blob encode(const vespalib::Version &version, const Routable &routable) const override;
Routable::UP decode(const vespalib::Version &version, BlobRef data) const override;
- virtual bool requireSequencing() const override { return false; }
};
} // namespace mbus