From fc1b0fd518fe5e37757690ce05e26fb4025d0059 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 26 Sep 2017 15:08:21 +0200 Subject: Use multiple threads for mbus invoke --- .../src/vespa/messagebus/network/rpcnetwork.cpp | 9 ++++- .../src/vespa/messagebus/network/rpcnetwork.h | 38 ++++++++++++---------- .../src/vespa/messagebus/network/rpcsend.cpp | 9 ++++- messagebus/src/vespa/messagebus/network/rpcsend.h | 1 + 4 files changed, 37 insertions(+), 20 deletions(-) (limited to 'messagebus') diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e5c292fedd9..95f3679f45c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -119,6 +120,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _regAPI(std::make_unique(*_orb, *_slobrokCfgFactory)), _oosManager(std::make_unique(*_orb, *_mirror, params.getOOSServerPattern())), _requestedPort(params.getListenPort()), + _executor(std::make_unique(8,65536)), _sendV1(std::make_unique()), _sendV2(std::make_unique()), _sendAdapters(), @@ -222,7 +224,10 @@ RPCNetwork::start() return true; } - +vespalib::Executor & +RPCNetwork::getExecutor() { + return *_executor; +} bool RPCNetwork::waitUntilReady(double seconds) const @@ -377,6 +382,8 @@ RPCNetwork::sync() void RPCNetwork::shutdown() { + _executor->shutdown(); + _executor->sync(); _transport->ShutDown(false); _threadPool->Close(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 5e762f1a2a9..13fab018c3b 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -60,24 +60,25 @@ private: using SendAdapterMap = std::map; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr _threadPool; - std::unique_ptr _transport; - std::unique_ptr _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr _servicePool; - std::unique_ptr _slobrokCfgFactory; - std::unique_ptr _mirror; - std::unique_ptr _regAPI; - std::unique_ptr _oosManager; - int _requestedPort; - std::unique_ptr _sendV1; - std::unique_ptr _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr _threadPool; + std::unique_ptr _transport; + std::unique_ptr _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr _targetPool; + TargetPoolTask _targetPoolTask; + std::unique_ptr _servicePool; + std::unique_ptr _slobrokCfgFactory; + std::unique_ptr _mirror; + std::unique_ptr _regAPI; + std::unique_ptr _oosManager; + int _requestedPort; + std::unique_ptr _executor; + std::unique_ptr _sendV1; + std::unique_ptr _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; /** * Resolves and assigns a service address for the given recipient using the @@ -235,6 +236,7 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); + vespalib::Executor & getExecutor(); }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 705b8648442..f47855340cf 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -7,9 +7,10 @@ #include #include #include -#include #include #include +#include +#include #include @@ -236,6 +237,12 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); + _net->getExecutor().execute(vespalib::makeLambdaTask([&, req]() { doRequest(req);})); +} + +void +RPCSend::doRequest(FRT_RPCRequest *req) +{ FRT_Values &args = *req->GetParams(); std::unique_ptr params = toParams(args); diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index c707b47f548..5002a3914f8 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -82,6 +82,7 @@ public: void invoke(FRT_RPCRequest *req); private: + void doRequest(FRT_RPCRequest *req); void attach(RPCNetwork &net) final override; void handleDiscard(Context ctx) final override; void sendByHandover(RoutingNode &recipient, const vespalib::Version &version, -- cgit v1.2.3