summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-07-04 17:07:18 +0200
committerGitHub <noreply@github.com>2022-07-04 17:07:18 +0200
commitbdca3da122423f78c7dace3ab10c27c575b9fa65 (patch)
treee86f6b362fa03219c2fddfaee5fb8943f42b13fa
parentb8e236091b22552fc0dea58ff1a038831b42a81d (diff)
parentf88ba160a85a3c326f3e89a0d31f10c9a27ca728 (diff)
Merge pull request #23349 from vespa-engine/balder/control-events-before-wakeup
Control events-before-wakeup for mbus too.
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h35
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def3
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp1
5 files changed, 14 insertions, 31 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 77d8ca24cfc..8f9037f70dd 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -84,7 +84,8 @@ toFNETConfig(const RPCNetworkParams & params) {
return fnet::TransportConfig(params.getNumNetworkThreads())
.maxInputBufferSize(params.getMaxInputBufferSize())
.maxOutputBufferSize(params.getMaxOutputBufferSize())
- .tcpNoDelay(params.getTcpNoDelay());
+ .tcpNoDelay(params.getTcpNoDelay())
+ .events_before_wakeup(params.events_before_wakeup());
}
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index a4cdd25040b..8fcadaa64c6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -18,11 +18,10 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_numThreads(4),
_numNetworkThreads(1),
_numRpcTargets(1),
+ _events_before_wakeup(1),
_tcpNoDelay(true),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
- _skip_request_thread(false),
- _skip_reply_thread(false),
_connectionExpireSecs(600),
_compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 16b8b9b1570..a8d611df653 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -22,11 +22,10 @@ private:
uint32_t _numThreads;
uint32_t _numNetworkThreads;
uint32_t _numRpcTargets;
+ uint32_t _events_before_wakeup;
bool _tcpNoDelay;
bool _dispatchOnEncode;
bool _dispatchOnDecode;
- bool _skip_request_thread;
- bool _skip_reply_thread;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
@@ -164,19 +163,6 @@ public:
}
/**
- * Sets the maximum input buffer size allowed for the underlying FNET connection. Using the value 0 means that there
- * is no limit; the connection will not free any allocated memory until it is cleaned up. This might potentially
- * save alot of allocation time.
- *
- * @param maxInputBufferSize The maximum number of bytes.
- * @return This, to allow chaining.
- */
- RPCNetworkParams &setMaxInputBufferSize(uint32_t maxInputBufferSize) {
- _maxInputBufferSize = maxInputBufferSize;
- return *this;
- }
-
- /**
* Returns the maximum output buffer size allowed for the underlying FNET connection.
*
* @return The maximum number of bytes.
@@ -185,19 +171,6 @@ public:
return _maxOutputBufferSize;
}
- /**
- * Sets the maximum output buffer size allowed for the underlying FNET connection. Using the value 0 means that there
- * is no limit; the connection will not free any allocated memory until it is cleaned up. This might potentially
- * save alot of allocation time.
- *
- * @param maxOutputBufferSize The maximum number of bytes.
- * @return This, to allow chaining.
- */
- RPCNetworkParams &setMaxOutputBufferSize(uint32_t maxOutputBufferSize) {
- _maxOutputBufferSize = maxOutputBufferSize;
- return *this;
- }
-
RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) {
_compressionConfig = compressionConfig;
return *this;
@@ -218,6 +191,12 @@ public:
}
bool getDispatchOnEncode() const { return _dispatchOnEncode; }
+
+ RPCNetworkParams &events_before_wakeup(uint32_t value) {
+ _events_before_wakeup = value;
+ return *this;
+ }
+ uint32_t events_before_wakeup() const { return _events_before_wakeup; }
};
}
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index 93156c367d9..75a3344b618 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -42,6 +42,9 @@ mbus.num_network_threads int default=1 restart
## Any value below 1 will be 1.
mbus.num_threads int default=4 restart
+## The number of events in the queue of a network (FNET) thread before it is woken up.
+mbus.events_before_wakeup int default=1 restart
+
## Enable to use above thread pool for encoding replies
## False will use network(fnet) thread
mbus.dispatch_on_encode bool default=true restart
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index ab1f72e85d2..a88c339052d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -355,6 +355,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setNumThreads(std::max(1, config->mbus.numThreads));
params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads));
params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets));
+ params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup));
params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
params.setTcpNoDelay(config->mbus.tcpNoDelay);