diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-05 18:50:27 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-05 18:50:27 +0200 |
commit | 0f877b0d6f72f734e17230969da7ec399ba2ca28 (patch) | |
tree | 14c5812f7ba21a978005e3edb7343bc9bb277cf4 | |
parent | 727dc0ad691d794a988d59f59131584febbf56bb (diff) | |
parent | 8dc32db0b5cc17fdd0df1f0ecbf3e7fa18b6b81e (diff) |
Merge pull request #12840 from vespa-engine/revert-12821-balder/control-net-and-worker-threads-independent
Revert "Control mbus worker threads and network threads separately."
6 files changed, 24 insertions, 46 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index e77664b0987..faa67b9bece 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -137,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>(params.getNumNetworkThreads())), + _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), @@ -149,7 +149,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)), _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 128000)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a37cf1d9176..a510aae9014 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -52,28 +52,27 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - int _requestedPort; - std::unique_ptr<RPCTargetPool> _targetPool; - std::unique_ptr<FNET_Task> _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + int _requestedPort; + std::unique_ptr<RPCTargetPool> _targetPool; + std::unique_ptr<FNET_Task> _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor; - std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; /** * Resolves and assigns a service address for the given recipient using the @@ -225,8 +224,8 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; } - vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; } + vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } + vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index e166a410a61..482a46b2564 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,7 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(2), - _numNetworkThreads(1), + _numThreads(1), _optimizeFor(OptimizeFor::LATENCY), _dispatchOnEncode(true), _dispatchOnDecode(false), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index fb53af82900..a4b752f46d4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -96,19 +96,6 @@ public: uint32_t getNumThreads() const { return _numThreads; } - /** - * Sets number of threads for the network. - * - * @param numNetworkThreads number of threads for the network - * @return This, to allow chaining. - */ - RPCNetworkParams &setNumNetworkThreads(uint32_t numNetworkThreads) { - _numNetworkThreads = numNetworkThreads; - return *this; - } - - uint32_t getNumNetworkThreads() const { return _numNetworkThreads; } - RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { _optimizeFor = tcpNoDelay; return *this; @@ -207,7 +194,6 @@ private: uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; uint32_t _numThreads; - uint32_t _numNetworkThreads; OptimizeFor _optimizeFor; bool _dispatchOnEncode; bool _dispatchOnDecode; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index c58a1a8ebfc..4536ea97855 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -31,11 +31,7 @@ mbus.rpctargetcache.ttl double default = 600 ## Number of threads for network. ## Any value below 1 will be 1. -mbus.num_network_threads int default=1 - -## Number of workers threads for messagebus. -## Any value below 1 will be 1. -mbus.num_threads int default=2 +mbus.num_threads int default=1 mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 19c157ffbd2..aff2b0f624f 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -422,7 +422,6 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> mbus::RPCNetworkParams params(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); params.setNumThreads(std::max(1, config->mbus.numThreads)); - params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setOptimizeFor(convert(config->mbus.optimizeFor)); |