aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-08-10 09:46:52 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-08-10 09:46:52 +0000
commitf870f76b5b430d776d5d35cace1e5f0ae4b4aec7 (patch)
tree7503ac60b47e976dafd716343523735bd413dfa0 /messagebus
parenteed1ed31709908ec5317369af9955a9f2862fbfe (diff)
keep rpc target alive until pending rpc invocations complete
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp9
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h8
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctargetpool.cpp2
3 files changed, 12 insertions, 7 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index 9c6ca9dff69..d7f3e77c6fd 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -5,8 +5,8 @@
namespace mbus {
-RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) :
- _lock(),
+RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb, ctor_tag)
+ : _lock(),
_orb(orb),
_name(spec),
_target(*_orb.GetTarget(spec.c_str())),
@@ -48,6 +48,7 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler)
handler.handleVersion(_version.get());
} else if (shouldInvoke) {
FRT_RPCRequest *req = _orb.AllocRPCRequest();
+ req->getStash().create<SP>(shared_from_this());
req->SetMethodName("mbus.getVersion");
_target.InvokeAsync(req, vespalib::to_s(timeout), this);
}
@@ -67,8 +68,9 @@ RPCTarget::isValid() const
}
void
-RPCTarget::RequestDone(FRT_RPCRequest *req)
+RPCTarget::RequestDone(FRT_RPCRequest *raw_req)
{
+ auto req = vespalib::ref_counted<FRT_RPCRequest>::internal_attach(raw_req);
HandlerList handlers;
{
std::lock_guard guard(_lock);
@@ -94,7 +96,6 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
_state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED);
}
_cond.notify_all();
- req->internal_subref();
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index fffffae64f7..77fcef5f48f 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -13,7 +13,7 @@ namespace mbus {
* target. Instances of this class are returned by {@link RPCService}, and
* cached by {@link RPCTargetPool}.
*/
-class RPCTarget : public FRT_IRequestWait {
+class RPCTarget : public FRT_IRequestWait, public std::enable_shared_from_this<RPCTarget> {
public:
/**
* Declares a version handler used when resolving the version of a target.
@@ -58,6 +58,7 @@ private:
Version_UP _version;
HandlerList _versionHandlers;
+ struct ctor_tag {};
public:
/**
* Convenience typedefs.
@@ -72,7 +73,10 @@ public:
* @param spec The connection spec of this target.
* @param orb The FRT supervisor to use when connecting to target.
*/
- RPCTarget(const string &name, FRT_Supervisor &orb);
+ RPCTarget(const string &name, FRT_Supervisor &orb, ctor_tag);
+ static SP create(const string &name, FRT_Supervisor &orb) {
+ return std::make_shared<RPCTarget>(name, orb, ctor_tag{});
+ }
/**
* Destructor. Subrefs the contained FRT target.
diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
index b403c65f863..db09b127114 100644
--- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp
@@ -97,7 +97,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address)
std::vector<RPCTarget::SP> targets;
targets.reserve(_numTargetsPerSpec);
for (size_t i(0); i < _numTargetsPerSpec; i++) {
- targets.push_back(std::make_shared<RPCTarget>(spec, orb));
+ targets.push_back(RPCTarget::create(spec, orb));
}
_targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime)));
return _targets.find(spec)->second.getTarget(guard, currentTime);