From 66ae46c80878aab4b0fc19d4c693e1a74cd8977a Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 8 Mar 2023 08:45:54 +0000 Subject: use ref_counted for ReplyGate --- messagebus/src/tests/replygate/replygate.cpp | 4 ++-- messagebus/src/vespa/messagebus/intermediatesession.cpp | 5 +++-- messagebus/src/vespa/messagebus/intermediatesession.h | 13 +++++++------ messagebus/src/vespa/messagebus/replygate.cpp | 7 +++---- messagebus/src/vespa/messagebus/replygate.h | 4 ++-- messagebus/src/vespa/messagebus/sourcesession.cpp | 7 +++---- messagebus/src/vespa/messagebus/sourcesession.h | 5 +++-- 7 files changed, 23 insertions(+), 22 deletions(-) (limited to 'messagebus/src') diff --git a/messagebus/src/tests/replygate/replygate.cpp b/messagebus/src/tests/replygate/replygate.cpp index 3de48fce130..c71993c9368 100644 --- a/messagebus/src/tests/replygate/replygate.cpp +++ b/messagebus/src/tests/replygate/replygate.cpp @@ -59,7 +59,7 @@ TEST("replygate_test") { { RoutableQueue q; MySender sender; - MyGate *gate = new MyGate(sender); + auto gate = vespalib::make_ref_counted(sender); { auto msg = std::make_unique("test"); msg->pushHandler(q); @@ -79,7 +79,7 @@ TEST("replygate_test") { EXPECT_TRUE(MyReply::dtorCnt == 1); EXPECT_TRUE(MyGate::ctorCnt == 1); EXPECT_TRUE(MyGate::dtorCnt == 0); - gate->subRef(); + gate = vespalib::ref_counted(); EXPECT_TRUE(MyGate::ctorCnt == 1); EXPECT_TRUE(MyGate::dtorCnt == 1); } diff --git a/messagebus/src/vespa/messagebus/intermediatesession.cpp b/messagebus/src/vespa/messagebus/intermediatesession.cpp index 2b8830f07e8..61cd77c0165 100644 --- a/messagebus/src/vespa/messagebus/intermediatesession.cpp +++ b/messagebus/src/vespa/messagebus/intermediatesession.cpp @@ -4,6 +4,8 @@ #include "messagebus.h" #include "replygate.h" +using vespalib::make_ref_counted; + namespace mbus { IntermediateSession::IntermediateSession(MessageBus &mbus, const IntermediateSessionParams ¶ms) : @@ -11,14 +13,13 @@ IntermediateSession::IntermediateSession(MessageBus &mbus, const IntermediateSes _name(params.getName()), _msgHandler(params.getMessageHandler()), _replyHandler(params.getReplyHandler()), - _gate(new ReplyGate(_mbus)) + _gate(make_ref_counted(_mbus)) { } IntermediateSession::~IntermediateSession() { _gate->close(); close(); - _gate->subRef(); } void diff --git a/messagebus/src/vespa/messagebus/intermediatesession.h b/messagebus/src/vespa/messagebus/intermediatesession.h index 0a17aa1e42a..8d938b6cb9e 100644 --- a/messagebus/src/vespa/messagebus/intermediatesession.h +++ b/messagebus/src/vespa/messagebus/intermediatesession.h @@ -4,11 +4,11 @@ #include "reply.h" #include "imessagehandler.h" #include "intermediatesessionparams.h" +#include "replygate.h" namespace mbus { class MessageBus; -class ReplyGate; class Message; /** @@ -21,12 +21,13 @@ class IntermediateSession : public IMessageHandler, private: friend class MessageBus; using MessageUP = std::unique_ptr; + template using ref_counted = vespalib::ref_counted; - MessageBus &_mbus; - string _name; - IMessageHandler &_msgHandler; - IReplyHandler &_replyHandler; - ReplyGate *_gate; + MessageBus &_mbus; + string _name; + IMessageHandler &_msgHandler; + IReplyHandler &_replyHandler; + ref_counted _gate; /** * This constructor is declared package private since only MessageBus is supposed to instantiate it. diff --git a/messagebus/src/vespa/messagebus/replygate.cpp b/messagebus/src/vespa/messagebus/replygate.cpp index 8094119f14c..1028d46aff4 100644 --- a/messagebus/src/vespa/messagebus/replygate.cpp +++ b/messagebus/src/vespa/messagebus/replygate.cpp @@ -6,7 +6,6 @@ namespace mbus { ReplyGate::ReplyGate(IMessageHandler &sender) : - vespalib::ReferenceCounter(), _sender(sender), _open(true) { } @@ -14,7 +13,7 @@ ReplyGate::ReplyGate(IMessageHandler &sender) : void ReplyGate::handleMessage(Message::UP msg) { - addRef(); + internal_addref(); msg->pushHandler(*this, *this); _sender.handleMessage(std::move(msg)); } @@ -34,13 +33,13 @@ ReplyGate::handleReply(Reply::UP reply) } else { reply->discard(); } - subRef(); + internal_subref(); } void ReplyGate::handleDiscard(Context) { - subRef(); + internal_subref(); } } // namespace mbus diff --git a/messagebus/src/vespa/messagebus/replygate.h b/messagebus/src/vespa/messagebus/replygate.h index 0c487de3ecf..e09dfe63365 100644 --- a/messagebus/src/vespa/messagebus/replygate.h +++ b/messagebus/src/vespa/messagebus/replygate.h @@ -5,7 +5,7 @@ #include "idiscardhandler.h" #include "imessagehandler.h" #include "ireplyhandler.h" -#include +#include #include namespace mbus { @@ -20,7 +20,7 @@ namespace mbus { * is handled outside this class. Note that this class is only intended for * internal use. */ -class ReplyGate : public vespalib::ReferenceCounter, +class ReplyGate : public vespalib::enable_ref_counted, public IDiscardHandler, public IMessageHandler, public IReplyHandler diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index 0691e0c07f9..feff3beed58 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -6,15 +6,17 @@ #include "tracelevel.h" #include #include +#include using vespalib::make_string; +using vespalib::make_ref_counted; namespace mbus { SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams ¶ms) : _lock(), _mbus(mbus), - _gate(new ReplyGate(_mbus)), + _gate(make_ref_counted(_mbus)), _sequencer(*_gate), _replyHandler(params.getReplyHandler()), _throttlePolicy(params.getThrottlePolicy()), @@ -31,9 +33,6 @@ SourceSession::~SourceSession() // Ensure that no more replies propagate from mbus. _gate->close(); _mbus.sync(); - - // Tell gate that we will no longer use it. - _gate->subRef(); } Result diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index d918724ad4f..03628f06c56 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -5,13 +5,13 @@ #include "result.h" #include "sequencer.h" #include "sourcesessionparams.h" +#include "replygate.h" #include #include namespace mbus { class MessageBus; -class ReplyGate; /** * A SourceSession is used to send Message objects along a named or explicitly defined route and get Reply @@ -21,11 +21,12 @@ class ReplyGate; class SourceSession : public IReplyHandler { private: friend class MessageBus; + template using ref_counted = vespalib::ref_counted; std::mutex _lock; std::condition_variable _cond; MessageBus &_mbus; - ReplyGate *_gate; + ref_counted _gate; Sequencer _sequencer; IReplyHandler &_replyHandler; IThrottlePolicy::SP _throttlePolicy; -- cgit v1.2.3