diff options
Diffstat (limited to 'messagebus/src/vespa/messagebus/network/rpcsend.cpp')
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcsend.cpp | 25 |
1 files changed, 16 insertions, 9 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 2422638dc05..d217c7964d6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - doRequestDone(req); + if ( _net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() { + doRequestDone(req); + })); + assert (!rejected); + } else { + doRequestDone(req); + } } void @@ -221,13 +228,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (protocol && _net->allowDispatchForEncode()) { + auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); + } else { + doHandleReply(protocol, std::move(reply)); } } @@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (_net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); + } else { + doRequest(req, protocol, std::move(params)); } } |