diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-06-15 17:52:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-06-15 17:52:20 +0200 |
commit | 45ffc87ad5313f975205d9f05ebaca8b813c2a37 (patch) | |
tree | 98728aa2fe6098c979bea36766a81de0d533669a | |
parent | 1b0d6f4961f2d6cc8a8fa2a2a76b48f9b32f7685 (diff) | |
parent | 6a45f38abdfe39a8d08fb679ec2be378699294b9 (diff) |
Merge pull request #6217 from vespa-engine/balder/multiple-threads-in-mbus
Balder/multiple threads in mbus
8 files changed, 84 insertions, 24 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index f3e3359fabe..108b94070bf 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -120,11 +120,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), - _compressionConfig(params.getCompressionConfig()) + _compressionConfig(params.getCompressionConfig()), + _allowDispatchForEncode(params.getDispatchOnEncode()), + _allowDispatchForDecode(params.getDispatchOnDecode()) { _transport->SetDirectWrite(false); _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); @@ -224,11 +226,6 @@ RPCNetwork::start() return true; } -vespalib::Executor & -RPCNetwork::getExecutor() { - return *_executor; -} - bool RPCNetwork::waitUntilReady(double seconds) const { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index e29d01c8b04..9c6516eced7 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -77,6 +77,9 @@ private: std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; + /** * Resolves and assigns a service address for the given recipient using the @@ -135,7 +138,7 @@ public: /** * Destruct **/ - virtual ~RPCNetwork(); + ~RPCNetwork() override; /** * Obtain the owner of this network. This method may only be invoked after @@ -226,7 +229,10 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor(); + vespalib::Executor & getExecutor() const { return *_executor; } + bool allowDispatchForEncode() const { return _allowDispatchForEncode; } + bool allowDispatchForDecode() const { return _allowDispatchForDecode; } + }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5e54de1bce6..b6f0231e619 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -10,6 +10,9 @@ RPCNetworkParams::RPCNetworkParams() : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), + _numThreads(4), + _dispatchOnEncode(true), + _dispatchOnDecode(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 0a4ed806c27..1dcc8178e68 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -19,6 +19,9 @@ private: int _listenPort; uint32_t _maxInputBufferSize; uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + bool _dispatchOnEncode; + bool _dispatchOnDecode; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -97,6 +100,19 @@ public: } /** + * Sets number of threads for the thread pool. + * + * @param numThreads number of threads for thread pool + * @return This, to allow chaining. + */ + RPCNetworkParams &setNumThreads(uint32_t numThreads) { + _numThreads = numThreads; + return *this; + } + + uint32_t getNumThreads() const { return _numThreads; } + + /** * Returns the number of seconds before an idle network connection expires. * * @return The number of seconds. @@ -165,6 +181,21 @@ public: return *this; } CompressionConfig getCompressionConfig() const { return _compressionConfig; } + + + RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) { + _dispatchOnDecode = dispatchOnDecode; + return *this; + } + + uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; } + + RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { + _dispatchOnEncode = dispatchOnEncode; + return *this; + } + + uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index e23f4dc29d9..04cccd59903 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -61,7 +61,7 @@ RPCSend::RPCSend() : _serverIdent("server") { } -RPCSend::~RPCSend() {} +RPCSend::~RPCSend() = default; void RPCSend::attach(RPCNetwork &net) @@ -221,7 +221,7 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing()) { + if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { doHandleReply(protocol, std::move(reply)); } else { auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { @@ -256,22 +256,29 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - doRequest(req); -} - -void -RPCSend::doRequest(FRT_RPCRequest *req) -{ FRT_Values &args = *req->GetParams(); std::unique_ptr<Params> params = toParams(args); IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol()); if (protocol == nullptr) { replyError(req, params->getVersion(), params->getTraceLevel(), - Error(ErrorCode::UNKNOWN_PROTOCOL, - make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str()))); + Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.", + params->getProtocol().c_str(), _serverIdent.c_str()))); return; } + if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { + doRequest(req, protocol, std::move(params)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + doRequest(req, protocol, std::move(params)); + })); + assert (!rejected); + } +} + +void +RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params) +{ Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload()); req->DiscardBlobs(); if ( ! routable ) { diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h index 11a042b91c0..cfc1d72418a 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.h +++ b/messagebus/src/vespa/messagebus/network/rpcsend.h @@ -83,7 +83,7 @@ public: void invoke(FRT_RPCRequest *req); private: - void doRequest(FRT_RPCRequest *req); + void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params); void doRequestDone(FRT_RPCRequest *req); void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply); void attach(RPCNetwork &net) final override; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index e29540de064..2a2a840dd4e 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -28,3 +28,16 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 + +## Number of threads for mbus threadpool +## Any value below 1 will be 1. +mbus.num_threads int default=4 + +## Enable to use above thread pool for encoding replies +## False will use network(fnet) thread +mbus.dispatch_on_encode bool default=true + +## Enable to use above thread pool for decoding replies +## False will use network(fnet) thread +## Todo: Change default once verified in large scale deployment. +mbus.dispatch_on_decode bool default=false diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 94a151bcdc1..65523b62c59 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -79,7 +79,7 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapp : _request(std::move(request)) { } -StorageTransportContext::~StorageTransportContext() { } +StorageTransportContext::~StorageTransportContext() = default; void CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply) @@ -278,13 +278,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space( namespace { struct PlaceHolderBucketResolver : public BucketResolver { - virtual document::Bucket bucketFromId(const document::DocumentId &) const override { + document::Bucket bucketFromId(const document::DocumentId &) const override { return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0)); } - virtual document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { + document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override { return FixedBucketSpaces::default_space(); } - virtual vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { + vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override { assert(bucketSpace == FixedBucketSpaces::default_space()); return FixedBucketSpaces::to_string(bucketSpace); } @@ -423,6 +423,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setSlobrokConfig(_configUri); params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl); + params.setNumThreads(std::max(1, config->mbus.numThreads)); + params.setDispatchOnDecode(config->mbus.dispatchOnDecode); + params.setDispatchOnEncode(config->mbus.dispatchOnEncode); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { |