From 9cd0f66be8245318198588a1b1dec9bfe917b49e Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sun, 26 Jun 2022 20:29:36 +0000 Subject: Remove 'requireSequencing' as an option. As a consequence move protocol and params resolution to common code so that it is always handled in the decoding/encoding thread. --- .../protocolrepository/protocolrepository.cpp | 41 +++++------------- messagebus/src/vespa/messagebus/iprotocol.h | 3 -- .../src/vespa/messagebus/network/rpcsend.cpp | 48 +++++++++++----------- messagebus/src/vespa/messagebus/network/rpcsend.h | 4 +- .../src/vespa/messagebus/testlib/simpleprotocol.h | 1 - 5 files changed, 37 insertions(+), 60 deletions(-) (limited to 'messagebus') diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp index a9a268262ee..20649d56d13 100644 --- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp +++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp @@ -13,42 +13,23 @@ private: public: - TestProtocol(const string &name) + TestProtocol(const string &name) noexcept : _name(name) - { - // empty - } + { } - const string & - getName() const override - { - return _name; - } + const string & getName() const override { return _name; } - IRoutingPolicy::UP - createPolicy(const string &name, const string ¶m) const override - { - (void)name; - (void)param; + IRoutingPolicy::UP createPolicy(const string &, const string &) const override { throw std::exception(); } - Blob - encode(const vespalib::Version &version, const Routable &routable) const override - { - (void)version; - (void)routable; + Blob encode(const vespalib::Version &, const Routable &) const override { throw std::exception(); } - Routable::UP - decode(const vespalib::Version &version, BlobRef data) const override - { - (void)version; - (void)data; + Routable::UP decode(const vespalib::Version &, BlobRef ) const override { throw std::exception(); } - bool requireSequencing() const override { return false; } }; int @@ -58,16 +39,16 @@ Test::Main() ProtocolRepository repo; IProtocol::SP prev; - prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo"))); - ASSERT_TRUE(prev.get() == NULL); + prev = repo.putProtocol(std::make_shared("foo")); + ASSERT_FALSE(prev); IRoutingPolicy::SP policy = repo.getRoutingPolicy("foo", "bar", "baz"); - prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo"))); - ASSERT_TRUE(prev.get() != NULL); + prev = repo.putProtocol(std::make_shared("foo")); + ASSERT_TRUE(prev); ASSERT_NOT_EQUAL(prev.get(), repo.getProtocol("foo")); policy = repo.getRoutingPolicy("foo", "bar", "baz"); - ASSERT_TRUE(policy.get() == NULL); + ASSERT_FALSE(policy); TEST_DONE(); } diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h index 20726ced71b..e46a5600471 100644 --- a/messagebus/src/vespa/messagebus/iprotocol.h +++ b/messagebus/src/vespa/messagebus/iprotocol.h @@ -79,9 +79,6 @@ public: * @return The decoded routable. */ virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw() - - - virtual bool requireSequencing() const = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 485c8e3e911..7627aa876b3 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -55,10 +55,10 @@ private: } -RPCSend::RPCSend() : - _net(nullptr), - _clientIdent("client"), - _serverIdent("server") +RPCSend::RPCSend() + : _net(nullptr), + _clientIdent("client"), + _serverIdent("server") { } RPCSend::~RPCSend() = default; @@ -220,19 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); + if (!_net->allowDispatchForEncode()) { + doHandleReply(std::move(reply)); } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { - doHandleReply(protocol, std::move(reply)); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { + doHandleReply(std::move(reply)); })); assert (!rejected); } } void -RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) { +RPCSend::doHandleReply(Reply::UP reply) { + const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); ReplyContext::UP ctx(static_cast(reply->getContext().value.PTR)); FRT_RPCRequest &req = ctx->getRequest(); string version = ctx->getVersion().toString(); @@ -256,29 +256,29 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - FRT_Values &args = *req->GetParams(); - std::unique_ptr params = toParams(args); - IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); - if (protocol == nullptr) { - replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", - vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); - return; - } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); + if (!_net->allowDispatchForDecode()) { + doRequest(req); } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { - doRequest(req, protocol, std::move(params)); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { + doRequest(req); })); assert (!rejected); } } void -RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr params) +RPCSend::doRequest(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + std::unique_ptr params = toParams(args); + IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); + if (protocol == nullptr) { + replyError(req, params->getVersion(), params->getTraceLevel(), + Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", + vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); + return; + } Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); req->DiscardBlobs(); if ( ! routable ) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index e548c4adca2..c2bcb7dff2b 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -83,9 +83,9 @@ public: void invoke(FRT_RPCRequest *req); private: - void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr params); + void doRequest(FRT_RPCRequest *req); void doRequestDone(FRT_RPCRequest *req); - void doHandleReply(const IProtocol * protocol, std::unique_ptr reply); + void doHandleReply(std::unique_ptr reply); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h index 57a6e901b90..0486f058217 100644 --- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h +++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h @@ -72,7 +72,6 @@ public: IRoutingPolicy::UP createPolicy(const string &name, const string ¶m) const override; Blob encode(const vespalib::Version &version, const Routable &routable) const override; Routable::UP decode(const vespalib::Version &version, BlobRef data) const override; - virtual bool requireSequencing() const override { return false; } }; } // namespace mbus -- cgit v1.2.3