diff options
Diffstat (limited to 'messagebus/src/vespa/messagebus/network/rpcnetwork.cpp')
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpcnetwork.cpp | 40 |
1 files changed, 32 insertions, 8 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index de3be2ffa01..faa67b9bece 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,6 +17,8 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> +#include <vespa/vespalib/util/singleexecutor.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> #include <thread> @@ -78,6 +80,17 @@ struct TargetPoolTask : public FNET_Task { } }; +std::unique_ptr<vespalib::SyncableThreadExecutor> +createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { + switch (optimizeFor) { + case RPCNetworkParams::OptimizeFor::LATENCY: + return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); + case RPCNetworkParams::OptimizeFor::THROUGHPUT: + default: + return std::make_unique<vespalib::SingleExecutor>(100); + } +} + } RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, @@ -107,8 +120,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - _net.send(*this); - delete this; + if (_net.allowDispatchForEncode()) { + auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() { + _net.send(*this); + delete this; + })); + assert (!rejected); + } else { + _net.send(*this); + delete this; + } } } @@ -116,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>()), + _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), @@ -126,7 +147,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)), - _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), + _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), + _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -136,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); - _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -406,7 +427,8 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _executor->sync(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); task.await(); } @@ -415,8 +437,10 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _executor->shutdown(); - _executor->sync(); + _singleEncodeExecutor->shutdown(); + _singleDecodeExecutor->shutdown(); + _singleEncodeExecutor->sync(); + _singleDecodeExecutor->sync(); } void |