diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-10 14:27:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-10 14:27:15 +0200 |
commit | 6095e4e9e88c25da795889ea3e02d1fbbf02e463 (patch) | |
tree | c220939c311ee9547b7c4a7ab424ae8c83fc289a | |
parent | dfc0796e96ed6b0291cd37b5a2f81b7c3d7685f3 (diff) | |
parent | f870f76b5b430d776d5d35cace1e5f0ae4b4aec7 (diff) |
Merge pull request #28013 from vespa-engine/havardpe/extend-rpc-target-lifetime
keep rpc target alive until pending rpc invocations complete
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpctarget.cpp | 9 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpctarget.h | 8 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpctargetpool.cpp | 2 |
3 files changed, 12 insertions, 7 deletions
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index 9c6ca9dff69..d7f3e77c6fd 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -5,8 +5,8 @@ namespace mbus { -RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) : - _lock(), +RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb, ctor_tag) + : _lock(), _orb(orb), _name(spec), _target(*_orb.GetTarget(spec.c_str())), @@ -48,6 +48,7 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) handler.handleVersion(_version.get()); } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); + req->getStash().create<SP>(shared_from_this()); req->SetMethodName("mbus.getVersion"); _target.InvokeAsync(req, vespalib::to_s(timeout), this); } @@ -67,8 +68,9 @@ RPCTarget::isValid() const } void -RPCTarget::RequestDone(FRT_RPCRequest *req) +RPCTarget::RequestDone(FRT_RPCRequest *raw_req) { + auto req = vespalib::ref_counted<FRT_RPCRequest>::internal_attach(raw_req); HandlerList handlers; { std::lock_guard guard(_lock); @@ -94,7 +96,6 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); } _cond.notify_all(); - req->internal_subref(); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index fffffae64f7..77fcef5f48f 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -13,7 +13,7 @@ namespace mbus { * target. Instances of this class are returned by {@link RPCService}, and * cached by {@link RPCTargetPool}. */ -class RPCTarget : public FRT_IRequestWait { +class RPCTarget : public FRT_IRequestWait, public std::enable_shared_from_this<RPCTarget> { public: /** * Declares a version handler used when resolving the version of a target. @@ -58,6 +58,7 @@ private: Version_UP _version; HandlerList _versionHandlers; + struct ctor_tag {}; public: /** * Convenience typedefs. @@ -72,7 +73,10 @@ public: * @param spec The connection spec of this target. * @param orb The FRT supervisor to use when connecting to target. */ - RPCTarget(const string &name, FRT_Supervisor &orb); + RPCTarget(const string &name, FRT_Supervisor &orb, ctor_tag); + static SP create(const string &name, FRT_Supervisor &orb) { + return std::make_shared<RPCTarget>(name, orb, ctor_tag{}); + } /** * Destructor. Subrefs the contained FRT target. diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index b403c65f863..db09b127114 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -97,7 +97,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) std::vector<RPCTarget::SP> targets; targets.reserve(_numTargetsPerSpec); for (size_t i(0); i < _numTargetsPerSpec; i++) { - targets.push_back(std::make_shared<RPCTarget>(spec, orb)); + targets.push_back(RPCTarget::create(spec, orb)); } _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime))); return _targets.find(spec)->second.getTarget(guard, currentTime); |