summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-27 00:35:33 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-27 00:35:33 +0000
commitd6261332f8014bf55b8f6b76227a1be0146630be (patch)
tree939fe6012b0a088364602525c3d25f5bb7e7c992 /messagebus
parent9a448a86c87d6c2878c55443b66255fc0c23feb7 (diff)
tcpnodelay is dead. Both worse latency and throughput.
Use optimize_for to select executor type instead.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp27
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h5
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h33
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp2
5 files changed, 39 insertions, 32 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index a35b60bc1d8..6fd7ab212b4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -28,6 +28,8 @@ LOG_SETUP(".rpcnetwork");
using vespalib::make_string;
using namespace std::chrono_literals;
+namespace mbus {
+
namespace {
/**
@@ -58,9 +60,18 @@ public:
}
};
-} // namespace <unnamed>
+std::unique_ptr<vespalib::SyncableThreadExecutor>
+createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) {
+ switch (optimizeFor) {
+ case RPCNetworkParams::OptimizeFor::LATENCY:
+ return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000);
+ case RPCNetworkParams::OptimizeFor::THROUGHPUT:
+ default:
+ return std::make_unique<vespalib::SingleExecutor>(100);
+ }
+}
-namespace mbus {
+} // namespace <unnamed>
RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg,
const std::vector<RoutingNode*> &recipients)
@@ -116,11 +127,12 @@ RPCNetwork::TargetPoolTask::PerformTask()
Schedule(1.0);
}
+
RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_owner(nullptr),
_ident(params.getIdentity()),
_threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)),
- _transport(std::make_unique<FNET_Transport>()),
+ _transport(std::make_unique<FNET_Transport>(params.getNumThreads())),
_orb(std::make_unique<FRT_Supervisor>(_transport.get())),
_scheduler(*_transport->GetScheduler()),
_targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())),
@@ -130,9 +142,8 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
- _singleEncodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)),
- _singleDecodeExecutor(std::make_unique<vespalib::SingleExecutor>(100)),
+ _singleEncodeExecutor(createExecutor(params.getOptimizeFor())),
+ _singleDecodeExecutor(createExecutor(params.getOptimizeFor())),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -142,7 +153,6 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
{
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
_transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize());
- _transport->SetTCPNoDelay(params.getTcpNoDelay());
}
RPCNetwork::~RPCNetwork()
@@ -412,7 +422,6 @@ void
RPCNetwork::sync()
{
SyncTask task(_scheduler);
- _executor->sync();
_singleEncodeExecutor->sync();
_singleDecodeExecutor->sync();
task.await();
@@ -423,10 +432,8 @@ RPCNetwork::shutdown()
{
_transport->ShutDown(true);
_threadPool->Close();
- _executor->shutdown();
_singleEncodeExecutor->shutdown();
_singleDecodeExecutor->shutdown();
- _executor->sync();
_singleEncodeExecutor->sync();
_singleDecodeExecutor->sync();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index 8643beb20fa..04754829f15 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -72,7 +72,6 @@ private:
std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
int _requestedPort;
- std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor;
std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor;
std::unique_ptr<RPCSendAdapter> _sendV1;
@@ -233,8 +232,8 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_executor; }
- vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_executor; }
+ vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; }
+ vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5bf277a8ee6..482a46b2564 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
- _numThreads(4),
- _tcpNoDelay(true),
+ _numThreads(1),
+ _optimizeFor(OptimizeFor::LATENCY),
_dispatchOnEncode(true),
_dispatchOnDecode(false),
_connectionExpireSecs(600),
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 140f81c611c..a4b752f46d4 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -12,21 +12,10 @@ namespace mbus {
* held by this class. This class has reasonable default values for each parameter.
*/
class RPCNetworkParams {
-private:
+public:
+ enum class OptimizeFor { LATENCY, THROUGHPUT};
using CompressionConfig = vespalib::compression::CompressionConfig;
- Identity _identity;
- config::ConfigUri _slobrokConfig;
- int _listenPort;
- uint32_t _maxInputBufferSize;
- uint32_t _maxOutputBufferSize;
- uint32_t _numThreads;
- bool _tcpNoDelay;
- bool _dispatchOnEncode;
- bool _dispatchOnDecode;
- double _connectionExpireSecs;
- CompressionConfig _compressionConfig;
-public:
RPCNetworkParams();
RPCNetworkParams(config::ConfigUri configUri);
~RPCNetworkParams();
@@ -107,12 +96,12 @@ public:
uint32_t getNumThreads() const { return _numThreads; }
- RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) {
- _tcpNoDelay = tcpNoDelay;
+ RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) {
+ _optimizeFor = tcpNoDelay;
return *this;
}
- bool getTcpNoDelay() const { return _tcpNoDelay; }
+ OptimizeFor getOptimizeFor() const { return _optimizeFor; }
/**
* Returns the number of seconds before an idle network connection expires.
@@ -198,6 +187,18 @@ public:
}
uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
+private:
+ Identity _identity;
+ config::ConfigUri _slobrokConfig;
+ int _listenPort;
+ uint32_t _maxInputBufferSize;
+ uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ OptimizeFor _optimizeFor;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
+ double _connectionExpireSecs;
+ CompressionConfig _compressionConfig;
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index a40360ead71..d217c7964d6 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -228,7 +228,7 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (_net->allowDispatchForEncode()) {
+ if (protocol && _net->allowDispatchForEncode()) {
auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));