aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/network/rpcsend.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/vespa/messagebus/network/rpcsend.cpp')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp25
1 files changed, 16 insertions, 9 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 2422638dc05..d217c7964d6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- doRequestDone(req);
+ if ( _net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() {
+ doRequestDone(req);
+ }));
+ assert (!rejected);
+ } else {
+ doRequestDone(req);
+ }
}
void
@@ -221,13 +228,13 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ if (protocol && _net->allowDispatchForEncode()) {
+ auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));
assert (!rejected);
+ } else {
+ doHandleReply(protocol, std::move(reply));
}
}
@@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req)
vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
return;
}
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ if (_net->allowDispatchForDecode()) {
+ auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
doRequest(req, protocol, std::move(params));
}));
assert (!rejected);
+ } else {
+ doRequest(req, protocol, std::move(params));
}
}