summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-06-14 15:48:11 +0200
committerHenning Baldersheim <balder@oath.com>2018-06-15 14:51:38 +0200
commitf82b045d1ad8d032c5074d95b33aa0b0f3eb43b6 (patch)
treebc93945d71a5fce0ce8ce221b849ae035ecad4e6 /messagebus
parentf94ea4ac173c3477bb3c1069df97b666c36065db (diff)
Control threadpool and dispatch of encode/decode by config.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp11
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h31
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp9
5 files changed, 50 insertions, 14 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index f3e3359fabe..1372db14259 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -120,11 +120,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(std::min(1u, params.getNumThreads()), 65536)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
- _compressionConfig(params.getCompressionConfig())
+ _compressionConfig(params.getCompressionConfig()),
+ _allowDispatchForEncode(params.getDispatchOnEncode()),
+ _allowDispatchForDecode(params.getDispatchOnDecode())
{
_transport->SetDirectWrite(false);
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
@@ -224,11 +226,6 @@ RPCNetwork::start()
return true;
}
-vespalib::Executor &
-RPCNetwork::getExecutor() {
- return *_executor;
-}
-
bool
RPCNetwork::waitUntilReady(double seconds) const
{
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index e29d01c8b04..9c6516eced7 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -77,6 +77,9 @@ private:
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
+
/**
* Resolves and assigns a service address for the given recipient using the
@@ -135,7 +138,7 @@ public:
/**
* Destruct
**/
- virtual ~RPCNetwork();
+ ~RPCNetwork() override;
/**
* Obtain the owner of this network. This method may only be invoked after
@@ -226,7 +229,10 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor();
+ vespalib::Executor & getExecutor() const { return *_executor; }
+ bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
+ bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
+
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5e54de1bce6..b6f0231e619 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -10,6 +10,9 @@ RPCNetworkParams::RPCNetworkParams() :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
+ _numThreads(4),
+ _dispatchOnEncode(true),
+ _dispatchOnDecode(false),
_connectionExpireSecs(600),
_compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 0a4ed806c27..1dcc8178e68 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -19,6 +19,9 @@ private:
int _listenPort;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
@@ -97,6 +100,19 @@ public:
}
/**
+ * Sets number of threads for the thread pool.
+ *
+ * @param numThreads number of threads for thread pool
+ * @return This, to allow chaining.
+ */
+ RPCNetworkParams &setNumThreads(uint32_t numThreads) {
+ _numThreads = numThreads;
+ return *this;
+ }
+
+ uint32_t getNumThreads() const { return _numThreads; }
+
+ /**
* Returns the number of seconds before an idle network connection expires.
*
* @return The number of seconds.
@@ -165,6 +181,21 @@ public:
return *this;
}
CompressionConfig getCompressionConfig() const { return _compressionConfig; }
+
+
+ RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) {
+ _dispatchOnDecode = dispatchOnDecode;
+ return *this;
+ }
+
+ uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; }
+
+ RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) {
+ _dispatchOnEncode = dispatchOnEncode;
+ return *this;
+ }
+
+ uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index ba6523da340..04cccd59903 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -221,7 +221,7 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing()) {
+ if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
doHandleReply(protocol, std::move(reply));
} else {
auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
@@ -262,11 +262,11 @@ RPCSend::invoke(FRT_RPCRequest *req)
IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
if (protocol == nullptr) {
replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str())));
+ Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
+ params->getProtocol().c_str(), _serverIdent.c_str())));
return;
}
- if (protocol->requireSequencing()) {
+ if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
doRequest(req, protocol, std::move(params));
} else {
auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
@@ -279,7 +279,6 @@ RPCSend::invoke(FRT_RPCRequest *req)
void
RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params)
{
-
Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
req->DiscardBlobs();
if ( ! routable ) {