summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-06-15 17:52:20 +0200
committerGitHub <noreply@github.com>2018-06-15 17:52:20 +0200
commit45ffc87ad5313f975205d9f05ebaca8b813c2a37 (patch)
tree98728aa2fe6098c979bea36766a81de0d533669a
parent1b0d6f4961f2d6cc8a8fa2a2a76b48f9b32f7685 (diff)
parent6a45f38abdfe39a8d08fb679ec2be378699294b9 (diff)
Merge pull request #6217 from vespa-engine/balder/multiple-threads-in-mbus
Balder/multiple threads in mbus
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp11
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetworkparams.h31
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp27
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.h2
-rw-r--r--storage/src/vespa/storage/config/stor-communicationmanager.def13
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp11
8 files changed, 84 insertions, 24 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index f3e3359fabe..108b94070bf 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -120,11 +120,13 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_mirror(std::make_unique<slobrok::api::MirrorAPI>(*_orb, *_slobrokCfgFactory)),
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
- _executor(std::make_unique<vespalib::ThreadStackExecutor>(4,65536)),
+ _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
- _compressionConfig(params.getCompressionConfig())
+ _compressionConfig(params.getCompressionConfig()),
+ _allowDispatchForEncode(params.getDispatchOnEncode()),
+ _allowDispatchForDecode(params.getDispatchOnDecode())
{
_transport->SetDirectWrite(false);
_transport->SetMaxInputBufferSize(params.getMaxInputBufferSize());
@@ -224,11 +226,6 @@ RPCNetwork::start()
return true;
}
-vespalib::Executor &
-RPCNetwork::getExecutor() {
- return *_executor;
-}
-
bool
RPCNetwork::waitUntilReady(double seconds) const
{
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index e29d01c8b04..9c6516eced7 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -77,6 +77,9 @@ private:
std::unique_ptr<RPCSendAdapter> _sendV2;
SendAdapterMap _sendAdapters;
CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
+
/**
* Resolves and assigns a service address for the given recipient using the
@@ -135,7 +138,7 @@ public:
/**
* Destruct
**/
- virtual ~RPCNetwork();
+ ~RPCNetwork() override;
/**
* Obtain the owner of this network. This method may only be invoked after
@@ -226,7 +229,10 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor();
+ vespalib::Executor & getExecutor() const { return *_executor; }
+ bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
+ bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
+
};
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
index 5e54de1bce6..b6f0231e619 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp
@@ -10,6 +10,9 @@ RPCNetworkParams::RPCNetworkParams() :
_listenPort(0),
_maxInputBufferSize(256*1024),
_maxOutputBufferSize(256*1024),
+ _numThreads(4),
+ _dispatchOnEncode(true),
+ _dispatchOnDecode(false),
_connectionExpireSecs(600),
_compressionConfig(CompressionConfig::LZ4, 6, 90, 1024)
{ }
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
index 0a4ed806c27..1dcc8178e68 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h
@@ -19,6 +19,9 @@ private:
int _listenPort;
uint32_t _maxInputBufferSize;
uint32_t _maxOutputBufferSize;
+ uint32_t _numThreads;
+ bool _dispatchOnEncode;
+ bool _dispatchOnDecode;
double _connectionExpireSecs;
CompressionConfig _compressionConfig;
@@ -97,6 +100,19 @@ public:
}
/**
+ * Sets number of threads for the thread pool.
+ *
+ * @param numThreads number of threads for thread pool
+ * @return This, to allow chaining.
+ */
+ RPCNetworkParams &setNumThreads(uint32_t numThreads) {
+ _numThreads = numThreads;
+ return *this;
+ }
+
+ uint32_t getNumThreads() const { return _numThreads; }
+
+ /**
* Returns the number of seconds before an idle network connection expires.
*
* @return The number of seconds.
@@ -165,6 +181,21 @@ public:
return *this;
}
CompressionConfig getCompressionConfig() const { return _compressionConfig; }
+
+
+ RPCNetworkParams &setDispatchOnDecode(bool dispatchOnDecode) {
+ _dispatchOnDecode = dispatchOnDecode;
+ return *this;
+ }
+
+ uint32_t getDispatchOnDecode() const { return _dispatchOnDecode; }
+
+ RPCNetworkParams &setDispatchOnEncode(bool dispatchOnEncode) {
+ _dispatchOnEncode = dispatchOnEncode;
+ return *this;
+ }
+
+ uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; }
};
}
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index e23f4dc29d9..04cccd59903 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -61,7 +61,7 @@ RPCSend::RPCSend() :
_serverIdent("server")
{ }
-RPCSend::~RPCSend() {}
+RPCSend::~RPCSend() = default;
void
RPCSend::attach(RPCNetwork &net)
@@ -221,7 +221,7 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing()) {
+ if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
doHandleReply(protocol, std::move(reply));
} else {
auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
@@ -256,22 +256,29 @@ void
RPCSend::invoke(FRT_RPCRequest *req)
{
req->Detach();
- doRequest(req);
-}
-
-void
-RPCSend::doRequest(FRT_RPCRequest *req)
-{
FRT_Values &args = *req->GetParams();
std::unique_ptr<Params> params = toParams(args);
IProtocol * protocol = _net->getOwner().getProtocol(params->getProtocol());
if (protocol == nullptr) {
replyError(req, params->getVersion(), params->getTraceLevel(),
- Error(ErrorCode::UNKNOWN_PROTOCOL,
- make_string("Protocol '%s' is not known by %s.", params->getProtocol().c_str(), _serverIdent.c_str())));
+ Error(ErrorCode::UNKNOWN_PROTOCOL, make_string("Protocol '%s' is not known by %s.",
+ params->getProtocol().c_str(), _serverIdent.c_str())));
return;
}
+ if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
+ doRequest(req, protocol, std::move(params));
+ } else {
+ auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ doRequest(req, protocol, std::move(params));
+ }));
+ assert (!rejected);
+ }
+}
+
+void
+RPCSend::doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params)
+{
Routable::UP routable = protocol->decode(params->getVersion(), params->getPayload());
req->DiscardBlobs();
if ( ! routable ) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.h b/messagebus/src/vespa/messagebus/network/rpcsend.h
index 11a042b91c0..cfc1d72418a 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.h
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.h
@@ -83,7 +83,7 @@ public:
void invoke(FRT_RPCRequest *req);
private:
- void doRequest(FRT_RPCRequest *req);
+ void doRequest(FRT_RPCRequest *req, const IProtocol * protocol, std::unique_ptr<Params> params);
void doRequestDone(FRT_RPCRequest *req);
void doHandleReply(const IProtocol * protocol, std::unique_ptr<Reply> reply);
void attach(RPCNetwork &net) final override;
diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def
index e29540de064..2a2a840dd4e 100644
--- a/storage/src/vespa/storage/config/stor-communicationmanager.def
+++ b/storage/src/vespa/storage/config/stor-communicationmanager.def
@@ -28,3 +28,16 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4
## TTL for rpc target cache
mbus.rpctargetcache.ttl double default = 600
+
+## Number of threads for mbus threadpool
+## Any value below 1 will be 1.
+mbus.num_threads int default=4
+
+## Enable to use above thread pool for encoding replies
+## False will use network(fnet) thread
+mbus.dispatch_on_encode bool default=true
+
+## Enable to use above thread pool for decoding replies
+## False will use network(fnet) thread
+## Todo: Change default once verified in large scale deployment.
+mbus.dispatch_on_decode bool default=false
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 94a151bcdc1..65523b62c59 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -79,7 +79,7 @@ StorageTransportContext::StorageTransportContext(std::unique_ptr<RPCRequestWrapp
: _request(std::move(request))
{ }
-StorageTransportContext::~StorageTransportContext() { }
+StorageTransportContext::~StorageTransportContext() = default;
void
CommunicationManager::receiveStorageReply(const std::shared_ptr<api::StorageReply>& reply)
@@ -278,13 +278,13 @@ void CommunicationManager::fail_with_unresolvable_bucket_space(
namespace {
struct PlaceHolderBucketResolver : public BucketResolver {
- virtual document::Bucket bucketFromId(const document::DocumentId &) const override {
+ document::Bucket bucketFromId(const document::DocumentId &) const override {
return document::Bucket(FixedBucketSpaces::default_space(), document::BucketId(0));
}
- virtual document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
+ document::BucketSpace bucketSpaceFromName(const vespalib::string &) const override {
return FixedBucketSpaces::default_space();
}
- virtual vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
+ vespalib::string nameFromBucketSpace(const document::BucketSpace &bucketSpace) const override {
assert(bucketSpace == FixedBucketSpaces::default_space());
return FixedBucketSpaces::to_string(bucketSpace);
}
@@ -423,6 +423,9 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
params.setSlobrokConfig(_configUri);
params.setConnectionExpireSecs(config->mbus.rpctargetcache.ttl);
+ params.setNumThreads(std::max(1, config->mbus.numThreads));
+ params.setDispatchOnDecode(config->mbus.dispatchOnDecode);
+ params.setDispatchOnEncode(config->mbus.dispatchOnEncode);
params.setIdentity(mbus::Identity(_component.getIdentity()));
if (config->mbusport != -1) {