summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-07-02 07:37:01 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-07-02 07:37:01 +0000
commit33a30b2c11e7857ff5c29344a9c90cd260843e5d (patch)
treeac0d43afc25ff5701e9c596bab25d42a1f83c7cf
parent644e1e158f842e858a557871d58414b01e335b9b (diff)
Support multiple network threads in mbus.
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h14
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def5
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp6
6 files changed, 22 insertions, 7 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 34ba486d0c9..8d0733c4cf1 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -82,7 +82,7 @@ struct TargetPoolTask : public FNET_Task {
TransportConfig
toFNETConfig(const RPCNetworkParams & params) {
- return TransportConfig()
+ return TransportConfig(params.getNumNetworkThreads())
.maxInputBufferSize(params.getMaxInputBufferSize())
.maxOutputBufferSize(params.getMaxOutputBufferSize())
.tcpNoDelay(params.getTcpNoDelay());
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index d701358fc84..a3722376086 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -74,7 +74,6 @@ private:
bool _allowDispatchForEncode;
bool _allowDispatchForDecode;
-
/**
* Resolves and assigns a service address for the given recipient using the
* given address. This is called by the {@link
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index d440d3b012b..ed104dc6cd5 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -16,6 +16,7 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_maxInputBufferSize(256_Ki),
_maxOutputBufferSize(256_Ki),
_numThreads(4),
+ _numNetworkThreads(1),
_tcpNoDelay(true),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index ddb4df1a3a3..193bc013c0a 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -20,6 +20,7 @@ private:
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
uint32_t _numThreads;
+ uint32_t _numNetworkThreads;
bool _tcpNoDelay;
bool _dispatchOnEncode;
bool _dispatchOnDecode;
@@ -34,6 +35,19 @@ public:
~RPCNetworkParams();
/**
+ * Sets number of threads for the network.
+ *
+ * @param numNetworkThreads number of threads for the network
+ * @return This, to allow chaining.
+ */
+ RPCNetworkParams &setNumNetworkThreads(uint32_t numNetworkThreads) {
+ _numNetworkThreads = numNetworkThreads;
+ return *this;
+ }
+
+ uint32_t getNumNetworkThreads() const { return _numNetworkThreads; }
+
+ /**
* Returns the identity to use for the network.
*
* @return The identity.
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 799490ba114..534be7cb6fe 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -29,7 +29,10 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 restart
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600 restart
-## Number of threads for mbus threadpool
+## Number of threads for network.
+mbus.num_network_threads int default=1
+
+## Number of workers threads for messagebus
## Any value below 1 will be 1.
mbus.num_threads int default=4 restart
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 25942926155..39f058d1085 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -17,14 +17,11 @@
#include <vespa/storageapi/message/state.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/asciistream.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/util/stringfmt.h>
-
-#include <vespa/log/bufferedlogger.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentreply.h>
#include <string_view>
+#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".communication.manager");
using vespalib::make_string;
@@ -396,6 +393,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
mbus::RPCNetworkParams params(_configUri);
params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
params.setNumThreads(std::max(1, config->mbus.numThreads));
+ params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY);