diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-07-02 07:37:01 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-07-02 07:37:01 +0000 |
commit | 33a30b2c11e7857ff5c29344a9c90cd260843e5d (patch) | |
tree | ac0d43afc25ff5701e9c596bab25d42a1f83c7cf | |
parent | 644e1e158f842e858a557871d58414b01e335b9b (diff) |
Support multiple network threads in mbus.
6 files changed, 22 insertions, 7 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 34ba486d0c9..8d0733c4cf1 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -82,7 +82,7 @@ struct TargetPoolTask : public FNET_Task { TransportConfig toFNETConfig(const RPCNetworkParams & params) { - return TransportConfig() + return TransportConfig(params.getNumNetworkThreads()) .maxInputBufferSize(params.getMaxInputBufferSize()) .maxOutputBufferSize(params.getMaxOutputBufferSize()) .tcpNoDelay(params.getTcpNoDelay()); diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index d701358fc84..a3722376086 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -74,7 +74,6 @@ private: bool _allowDispatchForEncode; bool _allowDispatchForDecode; - /** * Resolves and assigns a service address for the given recipient using the * given address. This is called by the {@link diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index d440d3b012b..ed104dc6cd5 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -16,6 +16,7 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _maxInputBufferSize(256_Ki), _maxOutputBufferSize(256_Ki), _numThreads(4), + _numNetworkThreads(1), _tcpNoDelay(true), _dispatchOnEncode(true), _dispatchOnDecode(false), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index ddb4df1a3a3..193bc013c0a 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -20,6 +20,7 @@ private: uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; uint32_t _numThreads; + uint32_t _numNetworkThreads; bool _tcpNoDelay; bool _dispatchOnEncode; bool _dispatchOnDecode; @@ -34,6 +35,19 @@ public: ~RPCNetworkParams(); /** + * Sets number of threads for the network. + * + * @param numNetworkThreads number of threads for the network + * @return This, to allow chaining. + */ + RPCNetworkParams &setNumNetworkThreads(uint32_t numNetworkThreads) { + _numNetworkThreads = numNetworkThreads; + return *this; + } + + uint32_t getNumNetworkThreads() const { return _numNetworkThreads; } + + /** * Returns the identity to use for the network. * * @return The identity. diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 799490ba114..534be7cb6fe 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -29,7 +29,10 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 restart -## Number of threads for mbus threadpool +## Number of threads for network. +mbus.num_network_threads int default=1 + +## Number of workers threads for messagebus ## Any value below 1 will be 1. mbus.num_threads int default=4 restart diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 25942926155..39f058d1085 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -17,14 +17,11 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/stringfmt.h> - -#include <vespa/log/bufferedlogger.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> -#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> #include <string_view> +#include <vespa/log/bufferedlogger.h> LOG_SETUP(".communication.manager"); using vespalib::make_string; @@ -396,6 +393,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> mbus::RPCNetworkParams params(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); params.setNumThreads(std::max(1, config->mbus.numThreads)); + params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY); |