aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-26 21:48:09 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-26 21:48:09 +0000
commitb8aedd3efb3a1bffcfbd46c12e245d19c7d6a8c5 (patch)
tree9f1bf54cf1acb44e338b71bdd2d528bd727af768 /messagebus
parent6781a374c14072b03a3cd0dd4e6037108cfa04ef (diff)
Check allow for dispatch
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h43
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcsend.cpp25
3 files changed, 54 insertions, 32 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index 4b498c4c014..2cd6b4c408d 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -17,6 +17,8 @@
#include <vespa/fnet/scheduler.h>
#include <vespa/fnet/transport.h>
#include <vespa/fnet/frt/supervisor.h>
+#include <vespa/vespalib/util/singleexecutor.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/thread.h>
#include <thread>
@@ -87,8 +89,16 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
}
}
if (shouldSend) {
- _net.send(*this);
- delete this;
+ if (_net.allowDispatchForEncode()) {
+ auto rejected = _net.getExecutor(true).execute(vespalib::makeLambdaTask([this]() {
+ _net.send(*this);
+ delete this;
+ }));
+ assert (!rejected);
+ } else {
+ _net.send(*this);
+ delete this;
+ }
}
}
@@ -121,6 +131,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams &params) :
_regAPI(std::make_unique<slobrok::api::RegisterAPI>(*_orb, *_slobrokCfgFactory)),
_requestedPort(params.getListenPort()),
_executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)),
+ _singleExecutor(std::make_unique<vespalib::SingleExecutor>(1000)),
_sendV1(std::make_unique<RPCSendV1>()),
_sendV2(std::make_unique<RPCSendV2>()),
_sendAdapters(),
@@ -401,6 +412,7 @@ RPCNetwork::sync()
{
SyncTask task(_scheduler);
_executor->sync();
+ _singleExecutor->sync();
task.await();
}
@@ -410,7 +422,9 @@ RPCNetwork::shutdown()
_transport->ShutDown(true);
_threadPool->Close();
_executor->shutdown();
+ _singleExecutor->shutdown();
_executor->sync();
+ _singleExecutor->sync();
}
void
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a6c2724929d..d32473c66cf 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -59,26 +59,27 @@ private:
using SendAdapterMap = std::map<vespalib::Version, RPCSendAdapter*>;
- INetworkOwner *_owner;
- Identity _ident;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- FNET_Scheduler &_scheduler;
- std::unique_ptr<RPCTargetPool> _targetPool;
- TargetPoolTask _targetPoolTask;
- std::unique_ptr<RPCServicePool> _servicePool;
- std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
- int _requestedPort;
- std::unique_ptr<vespalib::ThreadStackExecutor> _executor;
- std::unique_ptr<RPCSendAdapter> _sendV1;
- std::unique_ptr<RPCSendAdapter> _sendV2;
- SendAdapterMap _sendAdapters;
- CompressionConfig _compressionConfig;
- bool _allowDispatchForEncode;
- bool _allowDispatchForDecode;
+ INetworkOwner *_owner;
+ Identity _ident;
+ std::unique_ptr<FastOS_ThreadPool> _threadPool;
+ std::unique_ptr<FNET_Transport> _transport;
+ std::unique_ptr<FRT_Supervisor> _orb;
+ FNET_Scheduler &_scheduler;
+ std::unique_ptr<RPCTargetPool> _targetPool;
+ TargetPoolTask _targetPoolTask;
+ std::unique_ptr<RPCServicePool> _servicePool;
+ std::unique_ptr<slobrok::ConfiguratorFactory> _slobrokCfgFactory;
+ std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
+ std::unique_ptr<slobrok::api::RegisterAPI> _regAPI;
+ int _requestedPort;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _executor;
+ std::unique_ptr<vespalib::SyncableThreadExecutor> _singleExecutor;
+ std::unique_ptr<RPCSendAdapter> _sendV1;
+ std::unique_ptr<RPCSendAdapter> _sendV2;
+ SendAdapterMap _sendAdapters;
+ CompressionConfig _compressionConfig;
+ bool _allowDispatchForEncode;
+ bool _allowDispatchForDecode;
/**
@@ -231,7 +232,7 @@ public:
const slobrok::api::IMirrorAPI &getMirror() const override;
CompressionConfig getCompressionConfig() { return _compressionConfig; }
void invoke(FRT_RPCRequest *req);
- vespalib::Executor & getExecutor() const { return *_executor; }
+ vespalib::Executor & getExecutor(bool requireSequencing) const { return requireSequencing ? *_singleExecutor : *_executor; }
bool allowDispatchForEncode() const { return _allowDispatchForEncode; }
bool allowDispatchForDecode() const { return _allowDispatchForDecode; }
diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
index 2422638dc05..820f058300f 100644
--- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp
@@ -148,7 +148,14 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version,
void
RPCSend::RequestDone(FRT_RPCRequest *req)
{
- doRequestDone(req);
+ if ( _net->allowDispatchForDecode()) {
+ auto rejected = _net->getExecutor(true).execute(makeLambdaTask([this, req]() {
+ doRequestDone(req);
+ }));
+ assert (!rejected);
+ } else {
+ doRequestDone(req);
+ }
}
void
@@ -221,13 +228,13 @@ void
RPCSend::handleReply(Reply::UP reply)
{
const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol());
- if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) {
- doHandleReply(protocol, std::move(reply));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
+ if (_net->allowDispatchForEncode()) {
+ auto rejected = _net->getExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable {
doHandleReply(protocol, std::move(reply));
}));
assert (!rejected);
+ } else {
+ doHandleReply(protocol, std::move(reply));
}
}
@@ -266,13 +273,13 @@ RPCSend::invoke(FRT_RPCRequest *req)
vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str())));
return;
}
- if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) {
- doRequest(req, protocol, std::move(params));
- } else {
- auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
+ if (_net->allowDispatchForDecode()) {
+ auto rejected = _net->getExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable {
doRequest(req, protocol, std::move(params));
}));
assert (!rejected);
+ } else {
+ doRequest(req, protocol, std::move(params));
}
}