diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-26 21:48:09 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-26 21:48:09 +0000 |
commit | b8aedd3efb3a1bffcfbd46c12e245d19c7d6a8c5 (patch) | |
tree | 9f1bf54cf1acb44e338b71bdd2d528bd727af768 /messagebus | |
parent | 6781a374c14072b03a3cd0dd4e6037108cfa04ef (diff) |
Check allow for dispatch
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 18 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.h | 43 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcsend.cpp | 25 |
3 files changed, 54 insertions, 32 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 4b498c4c014..2cd6b4c408d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,6 +17,8 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> #include <thread> @@ -87,8 +89,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.send(*this); - delete this; + if (_net.allowDispatchForEncode()) { + auto rejected = _net.getExecutor(true).execute(vespalib::makeLambdaTask([this]() { + _net.send(*this); + delete this; + })); + assert (!rejected); + } else { + _net.send(*this); + delete this; + } } } @@ -121,6 +131,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), + _singleExecutor(std::make_unique<vespalib::SingleExecutor>(1000)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -401,6 +412,7 @@ RPCNetwork::sync() { SyncTask task(_scheduler); _executor->sync(); + _singleExecutor->sync(); task.await(); } @@ -410,7 +422,9 @@ RPCNetwork::shutdown() _transport->ShutDown(true); _threadPool->Close(); _executor->shutdown(); + _singleExecutor->shutdown(); _executor->sync(); + _singleExecutor->sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a6c2724929d..d32473c66cf 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -59,26 +59,27 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - int _requestedPort; - std::unique_ptr<vespalib::ThreadStackExecutor> _executor; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<RPCTargetPool> _targetPool; + TargetPoolTask _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + int _requestedPort; + std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; + std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; /** @@ -231,7 +232,7 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor() const { return *_executor; } + vespalib::Executor & getExecutor(bool requireSequencing) const { return requireSequencing ? *_singleExecutor : *_executor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 2422638dc05..820f058300f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - doRequestDone(req); + if ( _net->allowDispatchForDecode()) { + auto rejected = _net->getExecutor(true).execute(makeLambdaTask([this, req]() { + doRequestDone(req); + })); + assert (!rejected); + } else { + doRequestDone(req); + } } void @@ -221,13 +228,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (_net->allowDispatchForEncode()) { + auto rejected = _net->getExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); + } else { + doHandleReply(protocol, std::move(reply)); } } @@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (_net->allowDispatchForDecode()) { + auto rejected = _net->getExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); + } else { + doRequest(req, protocol, std::move(params)); } } |