From f870f76b5b430d776d5d35cace1e5f0ae4b4aec7 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Thu, 10 Aug 2023 09:46:52 +0000 Subject: keep rpc target alive until pending rpc invocations complete --- messagebus/src/vespa/messagebus/network/rpctarget.cpp | 9 +++++---- messagebus/src/vespa/messagebus/network/rpctarget.h | 8 ++++++-- messagebus/src/vespa/messagebus/network/rpctargetpool.cpp | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) (limited to 'messagebus') diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index 9c6ca9dff69..d7f3e77c6fd 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -5,8 +5,8 @@ namespace mbus { -RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb) : - _lock(), +RPCTarget::RPCTarget(const string &spec, FRT_Supervisor &orb, ctor_tag) + : _lock(), _orb(orb), _name(spec), _target(*_orb.GetTarget(spec.c_str())), @@ -48,6 +48,7 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) handler.handleVersion(_version.get()); } else if (shouldInvoke) { FRT_RPCRequest *req = _orb.AllocRPCRequest(); + req->getStash().create(shared_from_this()); req->SetMethodName("mbus.getVersion"); _target.InvokeAsync(req, vespalib::to_s(timeout), this); } @@ -67,8 +68,9 @@ RPCTarget::isValid() const } void -RPCTarget::RequestDone(FRT_RPCRequest *req) +RPCTarget::RequestDone(FRT_RPCRequest *raw_req) { + auto req = vespalib::ref_counted::internal_attach(raw_req); HandlerList handlers; { std::lock_guard guard(_lock); @@ -94,7 +96,6 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); } _cond.notify_all(); - req->internal_subref(); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index fffffae64f7..77fcef5f48f 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -13,7 +13,7 @@ namespace mbus { * target. Instances of this class are returned by {@link RPCService}, and * cached by {@link RPCTargetPool}. */ -class RPCTarget : public FRT_IRequestWait { +class RPCTarget : public FRT_IRequestWait, public std::enable_shared_from_this { public: /** * Declares a version handler used when resolving the version of a target. @@ -58,6 +58,7 @@ private: Version_UP _version; HandlerList _versionHandlers; + struct ctor_tag {}; public: /** * Convenience typedefs. @@ -72,7 +73,10 @@ public: * @param spec The connection spec of this target. * @param orb The FRT supervisor to use when connecting to target. */ - RPCTarget(const string &name, FRT_Supervisor &orb); + RPCTarget(const string &name, FRT_Supervisor &orb, ctor_tag); + static SP create(const string &name, FRT_Supervisor &orb) { + return std::make_shared(name, orb, ctor_tag{}); + } /** * Destructor. Subrefs the contained FRT target. diff --git a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp index b403c65f863..db09b127114 100644 --- a/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctargetpool.cpp @@ -97,7 +97,7 @@ RPCTargetPool::getTarget(FRT_Supervisor &orb, const RPCServiceAddress &address) std::vector targets; targets.reserve(_numTargetsPerSpec); for (size_t i(0); i < _numTargetsPerSpec; i++) { - targets.push_back(std::make_shared(spec, orb)); + targets.push_back(RPCTarget::create(spec, orb)); } _targets.insert(TargetMap::value_type(spec, Entry(std::move(targets), currentTime))); return _targets.find(spec)->second.getTarget(guard, currentTime); -- cgit v1.2.3