summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-26 22:30:49 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-26 22:30:49 +0000
commitbc53d8bfa546fbffc0c0ef7ae6ab1050ac75147a (patch)
tree9956c80b60304b3d1278bf496cb007d94d6fe991 /messagebus
parentb8aedd3efb3a1bffcfbd46c12e245d19c7d6a8c5 (diff)
Split in separate encode/decode executors.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp12
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h5
2 files changed, 11 insertions, 6 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 2cd6b4c408d..1ddadf424ec 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -131,7 +131,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
_executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
- _singleExecutor(std::make_unique<vespalib::SingleExecutor>(1000)),
+ _singleEncodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)),
+ _singleDecodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -412,7 +413,8 @@ RPCNetwork::sync()
{
SyncTask task(_scheduler);
_executor->sync();
- _singleExecutor->sync();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
task.await();
}
@@ -422,9 +424,11 @@ RPCNetwork::shutdown()
_transport->ShutDown(true);
_threadPool->Close();
_executor->shutdown();
- _singleExecutor->shutdown();
+ _singleEncodeExecutor->shutdown();
+ _singleDecodeExecutor->shutdown();
_executor->sync();
- _singleExecutor->sync();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index d32473c66cf..6f76af4a68f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -73,7 +73,8 @@ private:
std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
int _requestedPort;
std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
@@ -232,7 +233,7 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor(bool requireSequencing) const { return requireSequencing ? *_singleExecutor : *_executor; }
+ vespalib::Executor & getExecutor(bool requireSequencing, bool encode) const { return requireSequencing ? (encode ? *_singleEncodeExecutor : singleDecodeExecutor) : *_executor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }