diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-07-04 17:07:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-04 17:07:18 +0200 |
commit | bdca3da122423f78c7dace3ab10c27c575b9fa65 (patch) | |
tree | e86f6b362fa03219c2fddfaee5fb8943f42b13fa | |
parent | b8e236091b22552fc0dea58ff1a038831b42a81d (diff) | |
parent | f88ba160a85a3c326f3e89a0d31f10c9a27ca728 (diff) |
Merge pull request #23349 from vespa-engine/balder/control-events-before-wakeup
Control events-before-wakeup for mbus too.
5 files changed, 14 insertions, 31 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 77d8ca24cfc..8f9037f70dd 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -84,7 +84,8 @@ toFNETConfig(const RPCNetworkParams & params) { return fnet::TransportConfig(params.getNumNetworkThreads()) .maxInputBufferSize(params.getMaxInputBufferSize()) .maxOutputBufferSize(params.getMaxOutputBufferSize()) - .tcpNoDelay(params.getTcpNoDelay()); + .tcpNoDelay(params.getTcpNoDelay()) + .events_before_wakeup(params.events_before_wakeup()); } } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index a4cdd25040b..8fcadaa64c6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -18,11 +18,10 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _numThreads(4), _numNetworkThreads(1), _numRpcTargets(1), + _events_before_wakeup(1), _tcpNoDelay(true), _dispatchOnEncode(true), _dispatchOnDecode(false), - _skip_request_thread(false), - _skip_reply_thread(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 16b8b9b1570..a8d611df653 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -22,11 +22,10 @@ private: uint32_t _numThreads; uint32_t _numNetworkThreads; uint32_t _numRpcTargets; + uint32_t _events_before_wakeup; bool _tcpNoDelay; bool _dispatchOnEncode; bool _dispatchOnDecode; - bool _skip_request_thread; - bool _skip_reply_thread; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -164,19 +163,6 @@ public: } /** - * Sets the maximum input buffer size allowed for the underlying FNET connection. Using the value 0 means that there - * is no limit; the connection will not free any allocated memory until it is cleaned up. This might potentially - * save alot of allocation time. - * - * @param maxInputBufferSize The maximum number of bytes. - * @return This, to allow chaining. - */ - RPCNetworkParams &setMaxInputBufferSize(uint32_t maxInputBufferSize) { - _maxInputBufferSize = maxInputBufferSize; - return *this; - } - - /** * Returns the maximum output buffer size allowed for the underlying FNET connection. * * @return The maximum number of bytes. @@ -185,19 +171,6 @@ public: return _maxOutputBufferSize; } - /** - * Sets the maximum output buffer size allowed for the underlying FNET connection. Using the value 0 means that there - * is no limit; the connection will not free any allocated memory until it is cleaned up. This might potentially - * save alot of allocation time. - * - * @param maxOutputBufferSize The maximum number of bytes. - * @return This, to allow chaining. - */ - RPCNetworkParams &setMaxOutputBufferSize(uint32_t maxOutputBufferSize) { - _maxOutputBufferSize = maxOutputBufferSize; - return *this; - } - RPCNetworkParams &setCompressionConfig(CompressionConfig compressionConfig) { _compressionConfig = compressionConfig; return *this; @@ -218,6 +191,12 @@ public: } bool getDispatchOnEncode() const { return _dispatchOnEncode; } + + RPCNetworkParams &events_before_wakeup(uint32_t value) { + _events_before_wakeup = value; + return *this; + } + uint32_t events_before_wakeup() const { return _events_before_wakeup; } }; } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 93156c367d9..75a3344b618 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -42,6 +42,9 @@ mbus.num_network_threads int default=1 restart ## Any value below 1 will be 1. mbus.num_threads int default=4 restart +## The number of events in the queue of a network (FNET) thread before it is woken up. +mbus.events_before_wakeup int default=1 restart + ## Enable to use above thread pool for encoding replies ## False will use network(fnet) thread mbus.dispatch_on_encode bool default=true restart diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index ab1f72e85d2..a88c339052d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -355,6 +355,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setNumThreads(std::max(1, config->mbus.numThreads)); params.setNumNetworkThreads(std::max(1, config->mbus.numNetworkThreads)); params.setNumRpcTargets(std::max(1, config->mbus.numRpcTargets)); + params.events_before_wakeup(std::max(1, config->mbus.eventsBeforeWakeup)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setTcpNoDelay(config->mbus.tcpNoDelay); |