From 1e9a21a8701848c4ad7306968bf21a88f24a5bfb Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 6 Jul 2022 13:45:30 +0000 Subject: Never dispatch to network helper threads. --- .../src/vespa/messagebus/network/rpcnetwork.cpp | 8 +------- messagebus/src/vespa/messagebus/network/rpcnetwork.h | 7 ------- .../src/vespa/messagebus/network/rpcnetworkparams.cpp | 2 -- .../src/vespa/messagebus/network/rpcnetworkparams.h | 17 ----------------- messagebus/src/vespa/messagebus/network/rpcsend.cpp | 19 ++----------------- 5 files changed, 3 insertions(+), 50 deletions(-) (limited to 'messagebus') diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 8f9037f70dd..d76994dd39d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include @@ -136,12 +135,9 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPool(std::make_unique(params.getConnectionExpireSecs(), params.getNumRpcTargets())), _targetPoolTask(std::make_unique(_scheduler, *_targetPool)), _servicePool(std::make_unique(*_mirror, 4_Ki)), - _executor(std::make_unique(params.getNumThreads(), 64_Ki)), _sendV2(std::make_unique()), _sendAdapters(), - _compressionConfig(params.getCompressionConfig()), - _allowDispatchForEncode(params.getDispatchOnEncode()), - _allowDispatchForDecode(params.getDispatchOnDecode()) + _compressionConfig(params.getCompressionConfig()) { } @@ -413,7 +409,6 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); task.await(); } @@ -424,7 +419,6 @@ RPCNetwork::shutdown() _scheduler.Kill(_targetPoolTask.get()); _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown().sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index e706431f90d..b95c0c77b3c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -65,12 +65,9 @@ private: std::unique_ptr _targetPool; std::unique_ptr _targetPoolTask; std::unique_ptr _servicePool; - std::unique_ptr _executor; std::unique_ptr _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; /** * Resolves and assigns a service address for the given recipient using the @@ -222,10 +219,6 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor() const { return *_executor; } - bool allowDispatchForEncode() const { return _allowDispatchForEncode; } - bool allowDispatchForDecode() const { return _allowDispatchForDecode; } - }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 8fcadaa64c6..76dcb81919f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -20,8 +20,6 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _numRpcTargets(1), _events_before_wakeup(1), _tcpNoDelay(true), - _dispatchOnEncode(true), - _dispatchOnDecode(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index a8d611df653..4837d58b42c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -24,8 +24,6 @@ private: uint32_t _numRpcTargets; uint32_t _events_before_wakeup; bool _tcpNoDelay; - bool _dispatchOnEncode; - bool _dispatchOnDecode; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -177,21 +175,6 @@ public: } CompressionConfig getCompressionConfig() const { return _compressionConfig; } - - RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) { - _dispatchOnDecode = dispatchOnDecode; - return *this; - } - - bool getDispatchOnDecode() const { return _dispatchOnDecode; } - - RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { - _dispatchOnEncode = dispatchOnEncode; - return *this; - } - - bool getDispatchOnEncode() const { return _dispatchOnEncode; } - RPCNetworkParams &events_before_wakeup(uint32_t value) { _events_before_wakeup = value; return *this; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 7627aa876b3..ff77a1bb639 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -220,14 +220,7 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - if (!_net->allowDispatchForEncode()) { - doHandleReply(std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { - doHandleReply(std::move(reply)); - })); - assert (!rejected); - } + doHandleReply(std::move(reply)); } void @@ -256,15 +249,7 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - - if (!_net->allowDispatchForDecode()) { - doRequest(req); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { - doRequest(req); - })); - assert (!rejected); - } + doRequest(req); } void -- cgit v1.2.3