aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-07-06 13:45:30 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-07-06 13:45:43 +0000
commit1e9a21a8701848c4ad7306968bf21a88f24a5bfb (patch)
tree76b433793ef6fdf377b7828a5bcd429ad78a8bb2 /messagebus
parentbb2c8dfa48a37927d3be5120178ded292406da45 (diff)
Never dispatch to network helper threads.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp8
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h7
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h17
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp19
5 files changed, 3 insertions, 50 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 8f9037f70dd..d76994dd39d 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,7 +17,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
-#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -136,12 +135,9 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs(), params.getNumRpcTargets())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4_Ki)),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 64_Ki)),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
- _compressionConfig(params.getCompressionConfig()),
- _allowDispatchForEncode(params.getDispatchOnEncode()),
- _allowDispatchForDecode(params.getDispatchOnDecode())
+ _compressionConfig(params.getCompressionConfig())
{
}
@@ -413,7 +409,6 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _executor->sync();
task.await();
}
@@ -424,7 +419,6 @@ RPCNetwork::shutdown()
_scheduler.Kill(_targetPoolTask.get());
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown().sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index e706431f90d..b95c0c77b3c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -65,12 +65,9 @@ private:
std::unique_ptr<RPCTargetPool> _targetPool;
std::unique_ptr<FNET_Task> _targetPoolTask;
std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
/**
* Resolves and assigns a service address for the given recipient using the
@@ -222,10 +219,6 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor() const { return *_executor; }
- bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
- bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
-
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 8fcadaa64c6..76dcb81919f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -20,8 +20,6 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_numRpcTargets(1),
_events_before_wakeup(1),
_tcpNoDelay(true),
- _dispatchOnEncode(true),
- _dispatchOnDecode(false),
_connectionExpireSecs(600),
_compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index a8d611df653..4837d58b42c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -24,8 +24,6 @@ private:
uint32_t _numRpcTargets;
uint32_t _events_before_wakeup;
bool _tcpNoDelay;
- bool _dispatchOnEncode;
- bool _dispatchOnDecode;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
@@ -177,21 +175,6 @@ public:
}
CompressionConfig getCompressionConfig() const { return _compressionConfig; }
-
- RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) {
- _dispatchOnDecode = dispatchOnDecode;
- return *this;
- }
-
- bool getDispatchOnDecode() const { return _dispatchOnDecode; }
-
- RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) {
- _dispatchOnEncode = dispatchOnEncode;
- return *this;
- }
-
- bool getDispatchOnEncode() const { return _dispatchOnEncode; }
-
RPCNetworkParams &events_before_wakeup(uint32_t value) {
_events_before_wakeup = value;
return *this;
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 7627aa876b3..ff77a1bb639 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -220,14 +220,7 @@ RPCSend::decode(vespalib::stringref protocolName, const vespalib::Version & vers
void
RPCSend::handleReply(Reply::UP reply)
{
- if (!_net->allowDispatchForEncode()) {
- doHandleReply(std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, reply = std::move(reply)]() mutable {
- doHandleReply(std::move(reply));
- }));
- assert (!rejected);
- }
+ doHandleReply(std::move(reply));
}
void
@@ -256,15 +249,7 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
-
- if (!_net->allowDispatchForDecode()) {
- doRequest(req);
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req]() {
- doRequest(req);
- }));
- assert (!rejected);
- }
+ doRequest(req);
}
void