From 42d5ce33431512375f946ef91d45a451d3cc8a6b Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sat, 28 Mar 2020 00:15:37 +0100 Subject: Revert "Revert "Balder/rearrange threads"" --- messagebus/src/vespa/messagebus/messenger.cpp | 4 +-- .../src/vespa/messagebus/network/rpcnetwork.cpp | 40 +++++++++++++++++----- .../src/vespa/messagebus/network/rpcnetwork.h | 7 ++-- .../vespa/messagebus/network/rpcnetworkparams.cpp | 4 +-- .../vespa/messagebus/network/rpcnetworkparams.h | 33 +++++++++--------- .../src/vespa/messagebus/network/rpcsend.cpp | 25 +++++++++----- .../storage/config/stor-communicationmanager.def | 8 ++--- .../storage/storageserver/communicationmanager.cpp | 14 +++++++- .../storage/storageserver/communicationmanager.h | 2 +- 9 files changed, 91 insertions(+), 46 deletions(-) diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 5313c4adcbb..36211d8ec38 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -246,13 +246,13 @@ Messenger::start() void Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler) { - enqueue(std::make_unique(std::move(msg), handler)); + handler.handleMessage(std::move(msg)); } void Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) { - enqueue(std::make_unique(std::move(reply), handler)); + handler.handleReply(std::move(reply)); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index de3be2ffa01..faa67b9bece 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include #include @@ -78,6 +80,17 @@ struct TargetPoolTask : public FNET_Task { } }; +std::unique_ptr +createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case RPCNetworkParams::OptimizeFor::LATENCY: + return std::make_unique(1, 0x10000); + case RPCNetworkParams::OptimizeFor::THROUGHPUT: + default: + return std::make_unique(100); + } +} + } RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, @@ -107,8 +120,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.send(*this); - delete this; + if (_net.allowDispatchForEncode()) { + auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() { + _net.send(*this); + delete this; + })); + assert (!rejected); + } else { + _net.send(*this); + delete this; + } } } @@ -116,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique(128000, 0)), - _transport(std::make_unique()), + _transport(std::make_unique(params.getNumThreads())), _orb(std::make_unique(_transport.get())), _scheduler(*_transport->GetScheduler()), _slobrokCfgFactory(std::make_unique(params.getSlobrokConfig())), @@ -126,7 +147,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPool(std::make_unique(params.getConnectionExpireSecs())), _targetPoolTask(std::make_unique(_scheduler, *_targetPool)), _servicePool(std::make_unique(*_mirror, 4096)), - _executor(std::make_unique(params.getNumThreads(), 65536)), + _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), + _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), _sendV1(std::make_unique()), _sendV2(std::make_unique()), _sendAdapters(), @@ -136,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); - _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -406,7 +427,8 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); task.await(); } @@ -415,8 +437,10 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown(); - _executor->sync(); + _singleEncodeExecutor->shutdown(); + _singleDecodeExecutor->shutdown(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a8eb514387c..a510aae9014 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -65,7 +65,8 @@ private: std::unique_ptr _targetPool; std::unique_ptr _targetPoolTask; std::unique_ptr _servicePool; - std::unique_ptr _executor; + std::unique_ptr _singleEncodeExecutor; + std::unique_ptr _singleDecodeExecutor; std::unique_ptr _sendV1; std::unique_ptr _sendV2; SendAdapterMap _sendAdapters; @@ -73,7 +74,6 @@ private: bool _allowDispatchForEncode; bool _allowDispatchForDecode; - /** * Resolves and assigns a service address for the given recipient using the * given address. This is called by the {@link @@ -224,7 +224,8 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor() const { return *_executor; } + vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } + vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5bf277a8ee6..482a46b2564 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(4), - _tcpNoDelay(true), + _numThreads(1), + _optimizeFor(OptimizeFor::LATENCY), _dispatchOnEncode(true), _dispatchOnDecode(false), _connectionExpireSecs(600), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 140f81c611c..a4b752f46d4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -12,21 +12,10 @@ namespace mbus { * held by this class. This class has reasonable default values for each parameter. */ class RPCNetworkParams { -private: +public: + enum class OptimizeFor { LATENCY, THROUGHPUT}; using CompressionConfig = vespalib::compression::CompressionConfig; - Identity _identity; - config::ConfigUri _slobrokConfig; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - uint32_t _numThreads; - bool _tcpNoDelay; - bool _dispatchOnEncode; - bool _dispatchOnDecode; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; -public: RPCNetworkParams(); RPCNetworkParams(config::ConfigUri configUri); ~RPCNetworkParams(); @@ -107,12 +96,12 @@ public: uint32_t getNumThreads() const { return _numThreads; } - RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) { - _tcpNoDelay = tcpNoDelay; + RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { + _optimizeFor = tcpNoDelay; return *this; } - bool getTcpNoDelay() const { return _tcpNoDelay; } + OptimizeFor getOptimizeFor() const { return _optimizeFor; } /** * Returns the number of seconds before an idle network connection expires. @@ -198,6 +187,18 @@ public: } uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } +private: + Identity _identity; + config::ConfigUri _slobrokConfig; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + OptimizeFor _optimizeFor; + bool _dispatchOnEncode; + bool _dispatchOnDecode; + double _connectionExpireSecs; + CompressionConfig _compressionConfig; }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 2422638dc05..d217c7964d6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - doRequestDone(req); + if ( _net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() { + doRequestDone(req); + })); + assert (!rejected); + } else { + doRequestDone(req); + } } void @@ -221,13 +228,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { - doHandleReply(protocol, std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (protocol && _net->allowDispatchForEncode()) { + auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); + } else { + doHandleReply(protocol, std::move(reply)); } } @@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { - doRequest(req, protocol, std::move(params)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (_net->allowDispatchForDecode()) { + auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); + } else { + doRequest(req, protocol, std::move(params)); } } diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 8f5b22aa7fa..33b3ec37c04 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -29,11 +29,11 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 -## Number of threads for mbus threadpool +## Number of threads for network. ## Any value below 1 will be 1. -mbus.num_threads int default=4 +mbus.num_threads int default=1 -mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY +mbus.optimize_for enum {LATENCY, THROUGHPUT} default = THROUGHPUT ## Enable to use above thread pool for encoding replies ## False will use network(fnet) thread @@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true ## Enable to use above thread pool for decoding replies ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. -mbus.dispatch_on_decode bool default=false +mbus.dispatch_on_decode bool default=true diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index fa2b0cda018..e7d1f06bbd7 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -25,6 +25,7 @@ LOG_SETUP(".communication.manager"); using vespalib::make_string; using document::FixedBucketSpaces; +using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; namespace storage { @@ -281,6 +282,17 @@ struct PlaceHolderBucketResolver : public BucketResolver { } }; +mbus::RPCNetworkParams::OptimizeFor +convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY: + return mbus::RPCNetworkParams::OptimizeFor::LATENCY; + case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT: + default: + return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT; + } +} + } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) @@ -415,7 +427,7 @@ void CommunicationManager::configure(std::unique_ptr params.setNumThreads(std::max(1, config->mbus.numThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); - params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY); + params.setOptimizeFor(convert(config->mbus.optimizeFor)); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index c08ad214768..a0ae4bf3b43 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -116,7 +116,7 @@ private: void process(const std::shared_ptr& msg); - using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); -- cgit v1.2.3