diff options
author | Henning Baldersheim <balder@oath.com> | 2018-06-14 15:48:11 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-06-15 14:51:38 +0200 |
commit | f82b045d1ad8d032c5074d95b33aa0b0f3eb43b6 (patch) | |
tree | bc93945d71a5fce0ce8ce221b849ae035ecad4e6 | |
parent | f94ea4ac173c3477bb3c1069df97b666c36065db (diff) |
Control threadpool and dispatch of encode/decode by config.
7 files changed, 70 insertions, 18 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index f3e3359fabe..1372db14259 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -120,11 +120,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(std::min(1u, params.getNumThreads()), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), - _compressionConfig(params.getCompressionConfig()) + _compressionConfig(params.getCompressionConfig()), + _allowDispatchForEncode(params.getDispatchOnEncode()), + _allowDispatchForDecode(params.getDispatchOnDecode()) { _transport->SetDirectWrite(false); _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); @@ -224,11 +226,6 @@ RPCNetwork::start() return true; } -vespalib::Executor & -RPCNetwork::getExecutor() { - return *_executor; -} - bool RPCNetwork::waitUntilReady(double seconds) const { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index e29d01c8b04..9c6516eced7 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -77,6 +77,9 @@ private: std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; + /** * Resolves and assigns a service address for the given recipient using the @@ -135,7 +138,7 @@ public: /** * Destruct **/ - virtual ~RPCNetwork(); + ~RPCNetwork() override; /** * Obtain the owner of this network. This method may only be invoked after @@ -226,7 +229,10 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor(); + vespalib::Executor & getExecutor() const { return *_executor; } + bool allowDispatchForEncode() const { return _allowDispatchForEncode; } + bool allowDispatchForDecode() const { return _allowDispatchForDecode; } + }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5e54de1bce6..b6f0231e619 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -10,6 +10,9 @@ RPCNetworkParams::RPCNetworkParams() : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), + _numThreads(4), + _dispatchOnEncode(true), + _dispatchOnDecode(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 0a4ed806c27..1dcc8178e68 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -19,6 +19,9 @@ private: int _listenPort; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + bool _dispatchOnEncode; + bool _dispatchOnDecode; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -97,6 +100,19 @@ public: } /** + * Sets number of threads for the thread pool. + * + * @param numThreads number of threads for thread pool + * @return This, to allow chaining. + */ + RPCNetworkParams &setNumThreads(uint32_t numThreads) { + _numThreads = numThreads; + return *this; + } + + uint32_t getNumThreads() const { return _numThreads; } + + /** * Returns the number of seconds before an idle network connection expires. * * @return The number of seconds. @@ -165,6 +181,21 @@ public: return *this; } CompressionConfig getCompressionConfig() const { return _compressionConfig; } + + + RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) { + _dispatchOnDecode = dispatchOnDecode; + return *this; + } + + uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; } + + RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { + _dispatchOnEncode = dispatchOnEncode; + return *this; + } + + uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index ba6523da340..04cccd59903 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -221,7 +221,7 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing()) { + if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { doHandleReply(protocol, std::move(reply)); } else { auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { @@ -262,11 +262,11 @@ RPCSend::invoke(FRT_RPCRequest *req) IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); if (protocol == nullptr) { replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, - make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str()))); + Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", + params->getProtocol().c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing()) { + if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { doRequest(req, protocol, std::move(params)); } else { auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { @@ -279,7 +279,6 @@ RPCSend::invoke(FRT_RPCRequest *req) void RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params) { - Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); req->DiscardBlobs(); if ( ! routable ) { diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index e29540de064..2a2a840dd4e 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -28,3 +28,16 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 + +## Number of threads for mbus threadpool +## Any value below 1 will be 1. +mbus.num_threads int default=4 + +## Enable to use above thread pool for encoding replies +## False will use network(fnet) thread +mbus.dispatch_on_encode bool default=true + +## Enable to use above thread pool for decoding replies +## False will use network(fnet) thread +## Todo: Change default once verified in large scale deployment. +mbus.dispatch_on_decode bool default=false diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 94a151bcdc1..1704b97ccc7 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -79,7 +79,7 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapp : _request(std::move(request)) { } -StorageTransportContext::~StorageTransportContext() { } +StorageTransportContext::~StorageTransportContext() = default; void CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) @@ -278,13 +278,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - virtual document::Bucket bucketFromId(const document::DocumentId &) const override { + document::Bucket bucketFromId(const document::DocumentId &) const override { return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); } - virtual document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - virtual vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -423,6 +423,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setSlobrokConfig(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); + params.setNumThreads(config->mbus.numThreads); + params.setDispatchOnDecode(config->mbus.dispatchOnDecode); + params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { |