diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-26 15:08:21 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-26 15:08:21 +0200 |
commit | fc1b0fd518fe5e37757690ce05e26fb4025d0059 (patch) | |
tree | eedbcdf9253fd5a6e5cba2b9aa8712457e232e5a /messagebus | |
parent | 6796195b37f351f843eea4992d2d45b7ba4eb771 (diff) |
Use multiple threads for mbus invoke
Diffstat (limited to 'messagebus')
4 files changed, 37 insertions, 20 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e5c292fedd9..95f3679f45c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -15,6 +15,7 @@ #include <vespa/slobrok/sbmirror.h> #include <vespa/vespalib/component/vtag.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> @@ -119,6 +120,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())), _requestedPort(params.getListenPort()), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(8,65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -222,7 +224,10 @@ RPCNetwork::start() return true; } - +vespalib::Executor & +RPCNetwork::getExecutor() { + return *_executor; +} bool RPCNetwork::waitUntilReady(double seconds) const @@ -377,6 +382,8 @@ RPCNetwork::sync() void RPCNetwork::shutdown() { + _executor->shutdown(); + _executor->sync(); _transport->ShutDown(false); _threadPool->Close(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 5e762f1a2a9..13fab018c3b 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -60,24 +60,25 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - std::unique_ptr<OOSManager> _oosManager; - int _requestedPort; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<RPCTargetPool> _targetPool; + TargetPoolTask _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + std::unique_ptr<OOSManager> _oosManager; + int _requestedPort; + std::unique_ptr<vespalib::ThreadStackExecutor> _executor; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; /** * Resolves and assigns a service address for the given recipient using the @@ -235,6 +236,7 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); + vespalib::Executor & getExecutor(); }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 705b8648442..f47855340cf 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -7,9 +7,10 @@ #include <vespa/messagebus/tracelevel.h> #include <vespa/messagebus/emptyreply.h> #include <vespa/messagebus/errorcode.h> -#include <vespa/vespalib/util/stringfmt.h> #include <vespa/fnet/channel.h> #include <vespa/fnet/frt/reflection.h> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/data/slime/cursor.h> @@ -236,6 +237,12 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); + _net->getExecutor().execute(vespalib::makeLambdaTask([&, req]() { doRequest(req);})); +} + +void +RPCSend::doRequest(FRT_RPCRequest *req) +{ FRT_Values &args = *req->GetParams(); std::unique_ptr<Params> params = toParams(args); diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index c707b47f548..5002a3914f8 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -82,6 +82,7 @@ public: void invoke(FRT_RPCRequest *req); private: + void doRequest(FRT_RPCRequest *req); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, |