diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-27 00:35:33 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-27 00:35:33 +0000 |
commit | d6261332f8014bf55b8f6b76227a1be0146630be (patch) | |
tree | 939fe6012b0a088364602525c3d25f5bb7e7c992 /messagebus | |
parent | 9a448a86c87d6c2878c55443b66255fc0c23feb7 (diff) |
tcpnodelay is dead. Both worse latency and throughput.
Use optimize_for to select executor type instead.
Diffstat (limited to 'messagebus')
5 files changed, 39 insertions, 32 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index a35b60bc1d8..6fd7ab212b4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -28,6 +28,8 @@ LOG_SETUP(".rpcnetwork"); using vespalib::make_string; using namespace std::chrono_literals; +namespace mbus { + namespace { /** @@ -58,9 +60,18 @@ public: } }; -} // namespace <unnamed> +std::unique_ptr<vespalib::SyncableThreadExecutor> +createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case RPCNetworkParams::OptimizeFor::LATENCY: + return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); + case RPCNetworkParams::OptimizeFor::THROUGHPUT: + default: + return std::make_unique<vespalib::SingleExecutor>(100); + } +} -namespace mbus { +} // namespace <unnamed> RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, const std::vector<RoutingNode*> &recipients) @@ -116,11 +127,12 @@ RPCNetwork::TargetPoolTask::PerformTask() Schedule(1.0); } + RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>()), + _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), @@ -130,9 +142,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)), _regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)), _requestedPort(params.getListenPort()), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), - _singleEncodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)), - _singleDecodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)), + _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), + _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -142,7 +153,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); - _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -412,7 +422,6 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); _singleEncodeExecutor->sync(); _singleDecodeExecutor->sync(); task.await(); @@ -423,10 +432,8 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown(); _singleEncodeExecutor->shutdown(); _singleDecodeExecutor->shutdown(); - _executor->sync(); _singleEncodeExecutor->sync(); _singleDecodeExecutor->sync(); } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index 8643beb20fa..04754829f15 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -72,7 +72,6 @@ private: std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; std::unique_ptr<slobrok::api::RegisterAPI> _regAPI; int _requestedPort; - std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor; std::unique_ptr<RPCSendAdapter> _sendV1; @@ -233,8 +232,8 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; } - vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; } + vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } + vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 5bf277a8ee6..482a46b2564 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(4), - _tcpNoDelay(true), + _numThreads(1), + _optimizeFor(OptimizeFor::LATENCY), _dispatchOnEncode(true), _dispatchOnDecode(false), _connectionExpireSecs(600), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index 140f81c611c..a4b752f46d4 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -12,21 +12,10 @@ namespace mbus { * held by this class. This class has reasonable default values for each parameter. */ class RPCNetworkParams { -private: +public: + enum class OptimizeFor { LATENCY, THROUGHPUT}; using CompressionConfig = vespalib::compression::CompressionConfig; - Identity _identity; - config::ConfigUri _slobrokConfig; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - uint32_t _numThreads; - bool _tcpNoDelay; - bool _dispatchOnEncode; - bool _dispatchOnDecode; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; -public: RPCNetworkParams(); RPCNetworkParams(config::ConfigUri configUri); ~RPCNetworkParams(); @@ -107,12 +96,12 @@ public: uint32_t getNumThreads() const { return _numThreads; } - RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) { - _tcpNoDelay = tcpNoDelay; + RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { + _optimizeFor = tcpNoDelay; return *this; } - bool getTcpNoDelay() const { return _tcpNoDelay; } + OptimizeFor getOptimizeFor() const { return _optimizeFor; } /** * Returns the number of seconds before an idle network connection expires. @@ -198,6 +187,18 @@ public: } uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } +private: + Identity _identity; + config::ConfigUri _slobrokConfig; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + OptimizeFor _optimizeFor; + bool _dispatchOnEncode; + bool _dispatchOnDecode; + double _connectionExpireSecs; + CompressionConfig _compressionConfig; }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index a40360ead71..d217c7964d6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -228,7 +228,7 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (_net->allowDispatchForEncode()) { + if (protocol && _net->allowDispatchForEncode()) { auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); |