diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-03-27 07:15:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-27 07:15:48 +0100 |
commit | 60b7ad36f7c102b3a39e814502741e40c4f5035d (patch) | |
tree | 4ba24d0905114852bcfaafc1fd1a8e5dcf784a6a /messagebus | |
parent | c35d2974449a3a4120ae6be5e1a236a8917efd0c (diff) |
Revert "Balder/rearrange threads"
Diffstat (limited to 'messagebus')
6 files changed, 60 insertions, 95 deletions
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 36211d8ec38..5313c4adcbb 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -246,13 +246,13 @@ Messenger::start() void Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler) { - handler.handleMessage(std::move(msg)); + enqueue(std::make_unique<MessageTask>(std::move(msg), handler)); } void Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) { - handler.handleReply(std::move(reply)); + enqueue(std::make_unique<ReplyTask>(std::move(reply), handler)); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 6fd7ab212b4..4b498c4c014 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,8 +17,6 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> -#include <vespa/vespalib/util/singleexecutor.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> #include <thread> @@ -28,8 +26,6 @@ LOG_SETUP(".rpcnetwork"); using vespalib::make_string; using namespace std::chrono_literals; -namespace mbus { - namespace { /** @@ -60,19 +56,10 @@ public: } }; -std::unique_ptr<vespalib::SyncableThreadExecutor> -createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { - switch (optimizeFor) { - case RPCNetworkParams::OptimizeFor::LATENCY: - return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); - case RPCNetworkParams::OptimizeFor::THROUGHPUT: - default: - return std::make_unique<vespalib::SingleExecutor>(100); - } -} - } // namespace <unnamed> +namespace mbus { + RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, const std::vector<RoutingNode*> &recipients) : _net(net), @@ -100,16 +87,8 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - if (_net.allowDispatchForEncode()) { - auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() { - _net.send(*this); - delete this; - })); - assert (!rejected); - } else { - _net.send(*this); - delete this; - } + _net.send(*this); + delete this; } } @@ -127,12 +106,11 @@ RPCNetwork::TargetPoolTask::PerformTask() Schedule(1.0); } - RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), + _transport(std::make_unique<FNET_Transport>()), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), @@ -142,8 +120,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), - _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -153,6 +130,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); + _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -422,8 +400,7 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _singleEncodeExecutor->sync(); - _singleDecodeExecutor->sync(); + _executor->sync(); task.await(); } @@ -432,10 +409,8 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _singleEncodeExecutor->shutdown(); - _singleDecodeExecutor->shutdown(); - _singleEncodeExecutor->sync(); - _singleDecodeExecutor->sync(); + _executor->shutdown(); + _executor->sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 04754829f15..a6c2724929d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -59,27 +59,26 @@ private: using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>; - INetworkOwner *_owner; - Identity _ident; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - FNET_Scheduler &_scheduler; - std::unique_ptr<RPCTargetPool> _targetPool; - TargetPoolTask _targetPoolTask; - std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; - int _requestedPort; - std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor; - std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor; - std::unique_ptr<RPCSendAdapter> _sendV1; - std::unique_ptr<RPCSendAdapter> _sendV2; - SendAdapterMap _sendAdapters; - CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; + INetworkOwner *_owner; + Identity _ident; + std::unique_ptr<FastOS_ThreadPool> _threadPool; + std::unique_ptr<FNET_Transport> _transport; + std::unique_ptr<FRT_Supervisor> _orb; + FNET_Scheduler &_scheduler; + std::unique_ptr<RPCTargetPool> _targetPool; + TargetPoolTask _targetPoolTask; + std::unique_ptr<RPCServicePool> _servicePool; + std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory; + std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; + std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; + int _requestedPort; + std::unique_ptr<vespalib::ThreadStackExecutor> _executor; + std::unique_ptr<RPCSendAdapter> _sendV1; + std::unique_ptr<RPCSendAdapter> _sendV2; + SendAdapterMap _sendAdapters; + CompressionConfig _compressionConfig; + bool _allowDispatchForEncode; + bool _allowDispatchForDecode; /** @@ -232,8 +231,7 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } - vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } + vespalib::Executor & getExecutor() const { return *_executor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 482a46b2564..5bf277a8ee6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(1), - _optimizeFor(OptimizeFor::LATENCY), + _numThreads(4), + _tcpNoDelay(true), _dispatchOnEncode(true), _dispatchOnDecode(false), _connectionExpireSecs(600), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index a4b752f46d4..140f81c611c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -12,10 +12,21 @@ namespace mbus { * held by this class. This class has reasonable default values for each parameter. */ class RPCNetworkParams { -public: - enum class OptimizeFor { LATENCY, THROUGHPUT}; +private: using CompressionConfig = vespalib::compression::CompressionConfig; + Identity _identity; + config::ConfigUri _slobrokConfig; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + bool _tcpNoDelay; + bool _dispatchOnEncode; + bool _dispatchOnDecode; + double _connectionExpireSecs; + CompressionConfig _compressionConfig; +public: RPCNetworkParams(); RPCNetworkParams(config::ConfigUri configUri); ~RPCNetworkParams(); @@ -96,12 +107,12 @@ public: uint32_t getNumThreads() const { return _numThreads; } - RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { - _optimizeFor = tcpNoDelay; + RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) { + _tcpNoDelay = tcpNoDelay; return *this; } - OptimizeFor getOptimizeFor() const { return _optimizeFor; } + bool getTcpNoDelay() const { return _tcpNoDelay; } /** * Returns the number of seconds before an idle network connection expires. @@ -187,18 +198,6 @@ public: } uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } -private: - Identity _identity; - config::ConfigUri _slobrokConfig; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - uint32_t _numThreads; - OptimizeFor _optimizeFor; - bool _dispatchOnEncode; - bool _dispatchOnDecode; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index d217c7964d6..2422638dc05 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,14 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - if ( _net->allowDispatchForDecode()) { - auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() { - doRequestDone(req); - })); - assert (!rejected); - } else { - doRequestDone(req); - } + doRequestDone(req); } void @@ -228,13 +221,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (protocol && _net->allowDispatchForEncode()) { - auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { + doHandleReply(protocol, std::move(reply)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); - } else { - doHandleReply(protocol, std::move(reply)); } } @@ -273,13 +266,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (_net->allowDispatchForDecode()) { - auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { + doRequest(req, protocol, std::move(params)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); - } else { - doRequest(req, protocol, std::move(params)); } } |