diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-29 16:56:15 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-29 16:56:15 +0200 |
commit | 40c97101a8219ec09b71e29014760cb953114edd (patch) | |
tree | 4471f404fd2888881de8955efa57c5d5d8c74918 /messagebus | |
parent | db0b5402657e33a7737ad7490473a7fd12850de7 (diff) |
Return reply in different thread if protocol does not require sequencing
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/vespa/messagebus/iprotocol.h | 3 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcsend.cpp | 14 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcsend.h | 3 |
3 files changed, 14 insertions, 6 deletions
diff --git a/messagebus/src/vespa/messagebus/iprotocol.h b/messagebus/src/vespa/messagebus/iprotocol.h index 514c5ea676f..8a4129d1976 100644 --- a/messagebus/src/vespa/messagebus/iprotocol.h +++ b/messagebus/src/vespa/messagebus/iprotocol.h @@ -52,8 +52,7 @@ public: * @param param Ppolicy specific parameter. * @return A newly created routing policy. */ - virtual IRoutingPolicy::UP createPolicy(const string &name, - const string ¶m) const = 0; + virtual IRoutingPolicy::UP createPolicy(const string &name, const string ¶m) const = 0; /** * Encodes the protocol specific data of a routable into a byte array. diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index cc6b7086756..1ee791d5454 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -220,11 +220,19 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - doHandleReply(std::move(reply)); + const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); + if (protocol->requireSequencing()) { + doHandleReply(protocol, std::move(reply)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + doHandleReply(protocol, std::move(reply)); + })); + assert (!rejected); + } } void -RPCSend::doHandleReply(Reply::UP reply) { +RPCSend::doHandleReply(const IProtocol * protocol, Reply::UP reply) { ReplyContext::UP ctx(static_cast<ReplyContext*>(reply->getContext().value.PTR)); FRT_RPCRequest &req = ctx->getRequest(); string version = ctx->getVersion().toString(); @@ -234,7 +242,7 @@ RPCSend::doHandleReply(Reply::UP reply) { } Blob payload(0); if (reply->getType() != 0) { - payload = _net->getOwner().getProtocol(reply->getProtocol())->encode(ctx->getVersion(), *reply); + payload = protocol->encode(ctx->getVersion(), *reply); if (payload.size() == 0) { reply->addError(Error(ErrorCode::ENCODE_ERROR, "An error occured while encoding the reply, see log.")); } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index ec455aea7bd..11a042b91c0 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -19,6 +19,7 @@ class Error; class Route; class Message; class RPCServiceAddress; +class IProtocol; class PayLoadFiller { @@ -84,7 +85,7 @@ public: private: void doRequest(FRT_RPCRequest *req); void doRequestDone(FRT_RPCRequest *req); - void doHandleReply(std::unique_ptr<Reply> reply); + void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, |