aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus/src/vespa/messagebus/network/rpcnetwork.cpp')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp40
1 files changed, 32 insertions, 8 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index de3be2ffa01..faa67b9bece 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,6 +17,8 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -78,6 +80,17 @@ struct TargetPoolTask : public FNET_Task {
}
};
+std::unique_ptr<vespalib::SyncableThreadExecutor>
+createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case RPCNetworkParams::OptimizeFor::LATENCY:
+ return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
+ case RPCNetworkParams::OptimizeFor::THROUGHPUT:
+ default:
+ return std::make_unique<vespalib::SingleExecutor>(100);
+ }
+}
+
}
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
@@ -107,8 +120,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
if (shouldSend) {
- _net.send(*this);
- delete this;
+ if (_net.allowDispatchForEncode()) {
+ auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() {
+ _net.send(*this);
+ delete this;
+ }));
+ assert (!rejected);
+ } else {
+ _net.send(*this);
+ delete this;
+ }
}
}
@@ -116,7 +137,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())),
@@ -126,7 +147,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
_targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)),
_servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
+ _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
+ _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -136,7 +158,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
{
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
_transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
- _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
@@ -406,7 +427,8 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _executor->sync();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
task.await();
}
@@ -415,8 +437,10 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown();
- _executor->sync();
+ _singleEncodeExecutor->shutdown();
+ _singleDecodeExecutor->shutdown();
+ _singleEncodeExecutor->sync();
+ _singleDecodeExecutor->sync();
}
void