summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-03 16:11:53 +0200
committerGitHub <noreply@github.com>2020-04-03 16:11:53 +0200
commita7a668ac285df3f5bacf263ca103a25ad0ff5ed6 (patch)
treefbd95b4175f54319ec83b54ba3e618aea716002c
parent6299f2bf521a233f47d76fc6efa30d46aa417910 (diff)
parent55787c4169eb0f30695101c228a76e9a1992ba63 (diff)
Merge pull request #12821 from vespa-engine/balder/control-net-and-worker-threads-independent
Control mbus worker threads and network threads separately.
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h43
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h14
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def6
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp1
6 files changed, 46 insertions, 24 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index faa67b9bece..e77664b0987 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -137,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
+ _transport(std::make_unique<FNET_Transport>(params.getNumNetworkThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -149,6 +149,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
_singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
_singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 128000)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a510aae9014..a37cf1d9176 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -52,27 +52,28 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- int _requestedPort;
- std::unique_ptr<RPCTargetPool> _targetPool;
- std::unique_ptr<FNET_Task> _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ int _requestedPort;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -224,8 +225,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
- vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 482a46b2564..e166a410a61 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,7 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(1),
+ _numThreads(2),
+ _numNetworkThreads(1),
_optimizeFor(OptimizeFor::LATENCY),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index a4b752f46d4..fb53af82900 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -96,6 +96,19 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
+ /**
+ * Sets number of threads for the network.
+ *
+ * @param numNetworkThreads number of threads for the network
+ * @return This, to allow chaining.
+ */
+ RPCNetworkParams &setNumNetworkThreads(uint32_t numNetworkThreads) {
+ _numNetworkThreads = numNetworkThreads;
+ return *this;
+ }
+
+ uint32_t getNumNetworkThreads() const { return _numNetworkThreads; }
+
RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
_optimizeFor = tcpNoDelay;
return *this;
@@ -194,6 +207,7 @@ private:
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
uint32_t _numThreads;
+ uint32_t _numNetworkThreads;
OptimizeFor _optimizeFor;
bool _dispatchOnEncode;
bool _dispatchOnDecode;
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 4536ea97855..c58a1a8ebfc 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -31,7 +31,11 @@ mbus.rpctargetcache.ttl double default = 600
## Number of threads for network.
## Any value below 1 will be 1.
-mbus.num_threads int default=1
+mbus.num_network_threads int default=1
+
+## Number of workers threads for messagebus.
+## Any value below 1 will be 1.
+mbus.num_threads int default=2
mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index aff2b0f624f..19c157ffbd2 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -422,6 +422,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
mbus::RPCNetworkParams params(_configUri);
params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
params.setNumThreads(std::max(1, config->mbus.numThreads));
+ params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
params.setOptimizeFor(convert(config->mbus.optimizeFor));