summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-05 18:49:54 +0200
committerGitHub <noreply@github.com>2020-04-05 18:49:54 +0200
commit8dc32db0b5cc17fdd0df1f0ecbf3e7fa18b6b81e (patch)
tree14c5812f7ba21a978005e3edb7343bc9bb277cf4
parent727dc0ad691d794a988d59f59131584febbf56bb (diff)
Revert "Control mbus worker threads and network threads separately."
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h43
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h14
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def6
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp1
6 files changed, 24 insertions, 46 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index e77664b0987..faa67b9bece 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -137,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>(params.getNumNetworkThreads())),
+ _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -149,7 +149,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
_singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
_singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 128000)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a37cf1d9176..a510aae9014 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -52,28 +52,27 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- int _requestedPort;
- std::unique_ptr<RPCTargetPool> _targetPool;
- std::unique_ptr<FNET_Task> _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ int _requestedPort;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ std::unique_ptr<FNET_Task> _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -225,8 +224,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; }
- vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index e166a410a61..482a46b2564 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,8 +14,7 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(2),
- _numNetworkThreads(1),
+ _numThreads(1),
_optimizeFor(OptimizeFor::LATENCY),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index fb53af82900..a4b752f46d4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -96,19 +96,6 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
- /**
- * Sets number of threads for the network.
- *
- * @param numNetworkThreads number of threads for the network
- * @return This, to allow chaining.
- */
- RPCNetworkParams &setNumNetworkThreads(uint32_t numNetworkThreads) {
- _numNetworkThreads = numNetworkThreads;
- return *this;
- }
-
- uint32_t getNumNetworkThreads() const { return _numNetworkThreads; }
-
RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
_optimizeFor = tcpNoDelay;
return *this;
@@ -207,7 +194,6 @@ private:
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
uint32_t _numThreads;
- uint32_t _numNetworkThreads;
OptimizeFor _optimizeFor;
bool _dispatchOnEncode;
bool _dispatchOnDecode;
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index c58a1a8ebfc..4536ea97855 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -31,11 +31,7 @@ mbus.rpctargetcache.ttl double default = 600
## Number of threads for network.
## Any value below 1 will be 1.
-mbus.num_network_threads int default=1
-
-## Number of workers threads for messagebus.
-## Any value below 1 will be 1.
-mbus.num_threads int default=2
+mbus.num_threads int default=1
mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 19c157ffbd2..aff2b0f624f 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -422,7 +422,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
mbus::RPCNetworkParams params(_configUri);
params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
params.setNumThreads(std::max(1, config->mbus.numThreads));
- params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
params.setOptimizeFor(convert(config->mbus.optimizeFor));