diff options
8 files changed, 38 insertions, 63 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h index ffbb63e54f1..f55c05f72cd 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.h @@ -292,7 +292,6 @@ public: mbus::IRoutingPolicy::UP createPolicy(const mbus::string &name, const mbus::string ¶m) const override; mbus::Blob encode(const vespalib::Version &version, const mbus::Routable &routable) const override; mbus::Routable::UP decode(const vespalib::Version &version, mbus::BlobRef data) const override; - bool requireSequencing() const override { return false; } }; } diff --git a/messagebus/src/tests/protocolrepository/protocolrepository.cpp b/messagebus/src/tests/protocolrepository/protocolrepository.cpp index a9a268262ee..20649d56d13 100644 --- a/messagebus/src/tests/protocolrepository/protocolrepository.cpp +++ b/messagebus/src/tests/protocolrepository/protocolrepository.cpp @@ -13,42 +13,23 @@ private: public: - TestProtocol(const string &name) + TestProtocol(const string &name) noexcept : _name(name) - { - // empty - } + { } - const string & - getName() const override - { - return _name; - } + const string & getName() const override { return _name; } - IRoutingPolicy::UP - createPolicy(const string &name, const string ¶m) const override - { - (void)name; - (void)param; + IRoutingPolicy::UP createPolicy(const string &, const string &) const override { throw std::exception(); } - Blob - encode(const vespalib::Version &version, const Routable &routable) const override - { - (void)version; - (void)routable; + Blob encode(const vespalib::Version &, const Routable &) const override { throw std::exception(); } - Routable::UP - decode(const vespalib::Version &version, BlobRef data) const override - { - (void)version; - (void)data; + Routable::UP decode(const vespalib::Version &, BlobRef ) const override { throw std::exception(); } - bool requireSequencing() const override { return false; } }; int @@ -58,16 +39,16 @@ Test::Main() ProtocolRepository repo; IProtocol::SP prev; - prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo"))); - ASSERT_TRUE(prev.get() == NULL); + prev = repo.putProtocol(std::make_shared<TestProtocol>("foo")); + ASSERT_FALSE(prev); IRoutingPolicy::SP policy = repo.getRoutingPolicy("foo", "bar", "baz"); - prev = repo.putProtocol(IProtocol::SP(new TestProtocol("foo"))); - ASSERT_TRUE(prev.get() != NULL); + prev = repo.putProtocol(std::make_shared<TestProtocol>("foo")); + ASSERT_TRUE(prev); ASSERT_NOT_EQUAL(prev.get(), repo.getProtocol("foo")); policy = repo.getRoutingPolicy("foo", "bar", "baz"); - ASSERT_TRUE(policy.get() == NULL); + ASSERT_FALSE(policy); TEST_DONE(); } diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h index 20726ced71b..e46a5600471 100644 --- a/messagebus/src/vespa/messagebus/iprotocol.h +++ b/messagebus/src/vespa/messagebus/iprotocol.h @@ -79,9 +79,6 @@ public: * @return The decoded routable. */ virtual Routable::UP decode(const vespalib::Version &version, BlobRef data) const = 0; // throw() - - - virtual bool requireSequencing() const = 0; }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 485c8e3e911..7627aa876b3 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -55,10 +55,10 @@ private: } -RPCSend::RPCSend() : - _net(nullptr), - _clientIdent("client"), - _serverIdent("server") +RPCSend::RPCSend() + : _net(nullptr), + _clientIdent("client"), + _serverIdent("server") { } RPCSend::~RPCSend() = default; @@ -220,19 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); + if (!_net->allowDispatchForEncode()) { + doHandleReply(std::move(reply)); } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { - doHandleReply(protocol, std::move(reply)); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { + doHandleReply(std::move(reply)); })); assert (!rejected); } } void -RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) { +RPCSend::doHandleReply(Reply::UP reply) { + const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR)); FRT_RPCRequest &req = ctx->getRequest(); string version = ctx->getVersion().toString(); @@ -256,29 +256,29 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - FRT_Values &args = *req->GetParams(); - std::unique_ptr<Params> params = toParams(args); - IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); - if (protocol == nullptr) { - replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", - vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); - return; - } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); + if (!_net->allowDispatchForDecode()) { + doRequest(req); } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { - doRequest(req, protocol, std::move(params)); + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { + doRequest(req); })); assert (!rejected); } } void -RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params) +RPCSend::doRequest(FRT_RPCRequest *req) { + FRT_Values &args = *req->GetParams(); + std::unique_ptr<Params> params = toParams(args); + IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); + if (protocol == nullptr) { + replyError(req, params->getVersion(), params->getTraceLevel(), + Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", + vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); + return; + } Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); req->DiscardBlobs(); if ( ! routable ) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index e548c4adca2..c2bcb7dff2b 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -83,9 +83,9 @@ public: void invoke(FRT_RPCRequest *req); private: - void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params); + void doRequest(FRT_RPCRequest *req); void doRequestDone(FRT_RPCRequest *req); - void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply); + void doHandleReply(std::unique_ptr<Reply> reply); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, diff --git a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h index 57a6e901b90..0486f058217 100644 --- a/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h +++ b/messagebus/src/vespa/messagebus/testlib/simpleprotocol.h @@ -72,7 +72,6 @@ public: IRoutingPolicy::UP createPolicy(const string &name, const string ¶m) const override; Blob encode(const vespalib::Version &version, const Routable &routable) const override; Routable::UP decode(const vespalib::Version &version, BlobRef data) const override; - virtual bool requireSequencing() const override { return false; } }; } // namespace mbus diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 65a3cd718ec..6090e6bcd08 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -51,7 +51,7 @@ mbus.dispatch_on_encode bool default=true restart ## Enable to use above thread pool for decoding replies ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. -mbus.dispatch_on_decode bool default=false restart +mbus.dispatch_on_decode bool default=true restart ## Skip messenger thread on reply ## Experimental diff --git a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h index 3f36ac42117..591009c9832 100644 --- a/storage/src/vespa/storageapi/mbusprot/storageprotocol.h +++ b/storage/src/vespa/storageapi/mbusprot/storageprotocol.h @@ -22,7 +22,6 @@ public: mbus::IRoutingPolicy::UP createPolicy(const mbus::string& name, const mbus::string& param) const override; mbus::Blob encode(const vespalib::Version&, const mbus::Routable&) const override; mbus::Routable::UP decode(const vespalib::Version&, mbus::BlobRef) const override; - bool requireSequencing() const override { return true; } private: ProtocolSerialization5_0 _serializer5_0; ProtocolSerialization5_1 _serializer5_1; |