summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-04-04 22:37:16 +0200
committerGitHub <noreply@github.com>2020-04-04 22:37:16 +0200
commit130d4607a359ae2740bdeeb0179a731751f979a0 (patch)
tree672c9e0421be09f5f3ecf5e2d9eed37f92288fd7
parented34407f0470d5ff5446df903c57f7b649dc835f (diff)
parent798f1affa2f5be0bfb0b4c00519b6e935b674c4d (diff)
Merge pull request #12829 from vespa-engine/balder/adhere-2-throughput-slash-latency-for-all-protocols
Adhere to latency versus throughput settings.
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp17
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h8
2 files changed, 18 insertions, 7 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index e049e436d03..bbc9fc94d14 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -81,7 +81,7 @@ struct TargetPoolTask : public FNET_Task {
};
std::unique_ptr<vespalib::SyncableThreadExecutor>
-createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
+createSingleExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
switch (optimizeFor) {
case RPCNetworkParams::OptimizeFor::LATENCY:
return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
@@ -91,6 +91,14 @@ createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
}
}
+std::unique_ptr<vespalib::SyncableThreadExecutor>
+createExecutor(RPCNetworkParams::OptimizeFor optimizeFor, uint32_t numThreads) {
+ if ((optimizeFor == RPCNetworkParams::OptimizeFor::LATENCY) || (numThreads >= 2)) {
+ return std::make_unique<vespalib::ThreadStackExecutor>(numThreads, 0x10000);
+ }
+ return std::make_unique<vespalib::SingleExecutor>(1000, 10, 1ms);
+}
+
}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
@@ -147,9 +155,10 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
- _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
- _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 128000)),
+ _singleEncodeExecutor(createSingleExecutor(params.getOptimizeFor())),
+ _singleDecodeExecutor(createSingleExecutor(params.getOptimizeFor())),
+ _encodeExecutor(createExecutor(params.getOptimizeFor(), std::max(1u, params.getNumThreads()/4))),
+ _decodeExecutor(createExecutor(params.getOptimizeFor(), std::max(1u, params.getNumThreads()/4))),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a37cf1d9176..625efd7ae27 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -65,9 +65,11 @@ private:
std::unique_ptr<RPCTargetPool> _targetPool;
std::unique_ptr<FNET_Task> _targetPoolTask;
std::unique_ptr<RPCServicePool> _servicePool;
+ // TODO Instead of having 4 different executors, we should move the SequencedTaskExecutor into vespalib and use it there.
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _encodeExecutor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _decodeExecutor;
std::unique_ptr<RPCSendAdapter> _sendV1;
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
@@ -225,8 +227,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; }
- vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_encodeExecutor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_decodeExecutor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }