diff options
author | Henning Baldersheim <balder@oath.com> | 2018-06-14 15:48:11 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-06-15 14:51:38 +0200 |
commit | f82b045d1ad8d032c5074d95b33aa0b0f3eb43b6 (patch) | |
tree | bc93945d71a5fce0ce8ce221b849ae035ecad4e6 /messagebus | |
parent | f94ea4ac173c3477bb3c1069df97b666c36065db (diff) |
Control threadpool and dispatch of encode/decode by config.
Diffstat (limited to 'messagebus')
5 files changed, 50 insertions, 14 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index f3e3359fabe..1372db14259 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -120,11 +120,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(std::min(1u, params.getNumThreads()), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), - _compressionConfig(params.getCompressionConfig()) + _compressionConfig(params.getCompressionConfig()), + _allowDispatchForEncode(params.getDispatchOnEncode()), + _allowDispatchForDecode(params.getDispatchOnDecode()) { _transport->SetDirectWrite(false); _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); @@ -224,11 +226,6 @@ RPCNetwork::start() return true; } -vespalib::Executor & -RPCNetwork::getExecutor() { - return *_executor; -} - bool RPCNetwork::waitUntilReady(double seconds) const { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index e29d01c8b04..9c6516eced7 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -77,6 +77,9 @@ private: std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; + /** * Resolves and assigns a service address for the given recipient using the @@ -135,7 +138,7 @@ public: /** * Destruct **/ - virtual ~RPCNetwork(); + ~RPCNetwork() override; /** * Obtain the owner of this network. This method may only be invoked after @@ -226,7 +229,10 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor(); + vespalib::Executor & getExecutor() const { return *_executor; } + bool allowDispatchForEncode() const { return _allowDispatchForEncode; } + bool allowDispatchForDecode() const { return _allowDispatchForDecode; } + }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5e54de1bce6..b6f0231e619 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -10,6 +10,9 @@ RPCNetworkParams::RPCNetworkParams() : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), + _numThreads(4), + _dispatchOnEncode(true), + _dispatchOnDecode(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 0a4ed806c27..1dcc8178e68 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -19,6 +19,9 @@ private: int _listenPort; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + bool _dispatchOnEncode; + bool _dispatchOnDecode; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -97,6 +100,19 @@ public: } /** + * Sets number of threads for the thread pool. + * + * @param numThreads number of threads for thread pool + * @return This, to allow chaining. + */ + RPCNetworkParams &setNumThreads(uint32_t numThreads) { + _numThreads = numThreads; + return *this; + } + + uint32_t getNumThreads() const { return _numThreads; } + + /** * Returns the number of seconds before an idle network connection expires. * * @return The number of seconds. @@ -165,6 +181,21 @@ public: return *this; } CompressionConfig getCompressionConfig() const { return _compressionConfig; } + + + RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) { + _dispatchOnDecode = dispatchOnDecode; + return *this; + } + + uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; } + + RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { + _dispatchOnEncode = dispatchOnEncode; + return *this; + } + + uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index ba6523da340..04cccd59903 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -221,7 +221,7 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing()) { + if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { doHandleReply(protocol, std::move(reply)); } else { auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { @@ -262,11 +262,11 @@ RPCSend::invoke(FRT_RPCRequest *req) IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); if (protocol == nullptr) { replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, - make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str()))); + Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", + params->getProtocol().c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing()) { + if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { doRequest(req, protocol, std::move(params)); } else { auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { @@ -279,7 +279,6 @@ RPCSend::invoke(FRT_RPCRequest *req) void RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params) { - Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); req->DiscardBlobs(); if ( ! routable ) { |