summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-26 15:08:21 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-26 15:08:21 +0200
commitfc1b0fd518fe5e37757690ce05e26fb4025d0059 (patch)
treeeedbcdf9253fd5a6e5cba2b9aa8712457e232e5a /messagebus
parent6796195b37f351f843eea4992d2d45b7ba4eb771 (diff)
Use multiple threads for mbus invoke
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h38
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h1
4 files changed, 37 insertions, 20 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index e5c292fedd9..95f3679f45c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -15,6 +15,7 @@
#include <vespa/slobrok/sbmirror.h>
#include <vespa/vespalib/component/vtag.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
@@ -119,6 +120,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_oosManager(std::make_unique<OOSManager>(*_orb, *_mirror, params.getOOSServerPattern())),
_requestedPort(params.getListenPort()),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(8,65536)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -222,7 +224,10 @@ RPCNetwork::start()
return true;
}
-
+vespalib::Executor &
+RPCNetwork::getExecutor() {
+ return *_executor;
+}
bool
RPCNetwork::waitUntilReady(double seconds) const
@@ -377,6 +382,8 @@ RPCNetwork::sync()
void
RPCNetwork::shutdown()
{
+ _executor->shutdown();
+ _executor->sync();
_transport->ShutDown(false);
_threadPool->Close();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 5e762f1a2a9..13fab018c3b 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -60,24 +60,25 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<RPCTargetPool> _targetPool;
- TargetPoolTask _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- std::unique_ptr<OOSManager> _oosManager;
- int _requestedPort;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ TargetPoolTask _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ std::unique_ptr<OOSManager> _oosManager;
+ int _requestedPort;
+ std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -235,6 +236,7 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
+ vespalib::Executor & getExecutor();
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 705b8648442..f47855340cf 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -7,9 +7,10 @@
#include <vespa/messagebus/tracelevel.h>
#include <vespa/messagebus/emptyreply.h>
#include <vespa/messagebus/errorcode.h>
-#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/fnet/channel.h>
#include <vespa/fnet/frt/reflection.h>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/data/slime/cursor.h>
@@ -236,6 +237,12 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
+ _net->getExecutor().execute(vespalib::makeLambdaTask([&, req]() { doRequest(req);}));
+}
+
+void
+RPCSend::doRequest(FRT_RPCRequest *req)
+{
FRT_Values &args = *req->GetParams();
std::unique_ptr<Params> params = toParams(args);
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index c707b47f548..5002a3914f8 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -82,6 +82,7 @@ public:
void invoke(FRT_RPCRequest *req);
private:
+ void doRequest(FRT_RPCRequest *req);
void attach(RPCNetwork &net) final override;
void handleDiscard(Context ctx) final override;
void sendByHandover(RoutingNode &recipient, const vespalib::Version &version,