diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-07-06 13:45:30 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-07-06 13:45:43 +0000 |
commit | 1e9a21a8701848c4ad7306968bf21a88f24a5bfb (patch) | |
tree | 76b433793ef6fdf377b7828a5bcd429ad78a8bb2 /messagebus | |
parent | bb2c8dfa48a37927d3be5120178ded292406da45 (diff) |
Never dispatch to network helper threads.
Diffstat (limited to 'messagebus')
5 files changed, 3 insertions, 50 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index 8f9037f70dd..d76994dd39d 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,7 +17,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> #include <thread> @@ -136,12 +135,9 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs(), params.getNumRpcTargets())), _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 64_Ki)), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), - _compressionConfig(params.getCompressionConfig()), - _allowDispatchForEncode(params.getDispatchOnEncode()), - _allowDispatchForDecode(params.getDispatchOnDecode()) + _compressionConfig(params.getCompressionConfig()) { } @@ -413,7 +409,6 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); task.await(); } @@ -424,7 +419,6 @@ RPCNetwork::shutdown() _scheduler.Kill(_targetPoolTask.get()); _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown().sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index e706431f90d..b95c0c77b3c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -65,12 +65,9 @@ private: std::unique_ptr<RPCTargetPool> _targetPool; std::unique_ptr<FNET_Task> _targetPoolTask; std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<vespalib::SyncableThreadExecutor> _executor; std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; CompressionConfig _compressionConfig; - bool _allowDispatchForEncode; - bool _allowDispatchForDecode; /** * Resolves and assigns a service address for the given recipient using the @@ -222,10 +219,6 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getExecutor() const { return *_executor; } - bool allowDispatchForEncode() const { return _allowDispatchForEncode; } - bool allowDispatchForDecode() const { return _allowDispatchForDecode; } - }; } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 8fcadaa64c6..76dcb81919f 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -20,8 +20,6 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _numRpcTargets(1), _events_before_wakeup(1), _tcpNoDelay(true), - _dispatchOnEncode(true), - _dispatchOnDecode(false), _connectionExpireSecs(600), _compressionConfig(CompressionConfig::LZ4, 6, 90, 1024) { } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index a8d611df653..4837d58b42c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -24,8 +24,6 @@ private: uint32_t _numRpcTargets; uint32_t _events_before_wakeup; bool _tcpNoDelay; - bool _dispatchOnEncode; - bool _dispatchOnDecode; double _connectionExpireSecs; CompressionConfig _compressionConfig; @@ -177,21 +175,6 @@ public: } CompressionConfig getCompressionConfig() const { return _compressionConfig; } - - RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) { - _dispatchOnDecode = dispatchOnDecode; - return *this; - } - - bool getDispatchOnDecode() const { return _dispatchOnDecode; } - - RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) { - _dispatchOnEncode = dispatchOnEncode; - return *this; - } - - bool getDispatchOnEncode() const { return _dispatchOnEncode; } - RPCNetworkParams &events_before_wakeup(uint32_t value) { _events_before_wakeup = value; return *this; diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index 7627aa876b3..ff77a1bb639 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -220,14 +220,7 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers void RPCSend::handleReply(Reply::UP reply) { - if (!_net->allowDispatchForEncode()) { - doHandleReply(std::move(reply)); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable { - doHandleReply(std::move(reply)); - })); - assert (!rejected); - } + doHandleReply(std::move(reply)); } void @@ -256,15 +249,7 @@ void RPCSend::invoke(FRT_RPCRequest *req) { req->Detach(); - - if (!_net->allowDispatchForDecode()) { - doRequest(req); - } else { - auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() { - doRequest(req); - })); - assert (!rejected); - } + doRequest(req); } void |