aboutsummaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-03-08 08:45:54 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-03-08 08:45:54 +0000
commit66ae46c80878aab4b0fc19d4c693e1a74cd8977a (patch)
treef2211ceae2fe5137d130363c8db8edc2dd6e0d15 /messagebus
parent0d234088e4cace78188d71e599c2c1e48d30321a (diff)
use ref_counted for ReplyGate
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/tests/replygate/replygate.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/intermediatesession.cpp5
-rw-r--r--messagebus/src/vespa/messagebus/intermediatesession.h13
-rw-r--r--messagebus/src/vespa/messagebus/replygate.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/replygate.h4
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp7
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.h5
7 files changed, 23 insertions, 22 deletions
diff --git a/messagebus/src/tests/replygate/replygate.cpp b/messagebus/src/tests/replygate/replygate.cpp
index 3de48fce130..c71993c9368 100644
--- a/messagebus/src/tests/replygate/replygate.cpp
+++ b/messagebus/src/tests/replygate/replygate.cpp
@@ -59,7 +59,7 @@ TEST("replygate_test") {
{
RoutableQueue q;
MySender sender;
- MyGate *gate = new MyGate(sender);
+ auto gate = vespalib::make_ref_counted<MyGate>(sender);
{
auto msg = std::make_unique<SimpleMessage>("test");
msg->pushHandler(q);
@@ -79,7 +79,7 @@ TEST("replygate_test") {
EXPECT_TRUE(MyReply::dtorCnt == 1);
EXPECT_TRUE(MyGate::ctorCnt == 1);
EXPECT_TRUE(MyGate::dtorCnt == 0);
- gate->subRef();
+ gate = vespalib::ref_counted<MyGate>();
EXPECT_TRUE(MyGate::ctorCnt == 1);
EXPECT_TRUE(MyGate::dtorCnt == 1);
}
diff --git a/messagebus/src/vespa/messagebus/intermediatesession.cpp b/messagebus/src/vespa/messagebus/intermediatesession.cpp
index 2b8830f07e8..61cd77c0165 100644
--- a/messagebus/src/vespa/messagebus/intermediatesession.cpp
+++ b/messagebus/src/vespa/messagebus/intermediatesession.cpp
@@ -4,6 +4,8 @@
#include "messagebus.h"
#include "replygate.h"
+using vespalib::make_ref_counted;
+
namespace mbus {
IntermediateSession::IntermediateSession(MessageBus &mbus, const IntermediateSessionParams &params) :
@@ -11,14 +13,13 @@ IntermediateSession::IntermediateSession(MessageBus &mbus, const IntermediateSes
_name(params.getName()),
_msgHandler(params.getMessageHandler()),
_replyHandler(params.getReplyHandler()),
- _gate(new ReplyGate(_mbus))
+ _gate(make_ref_counted<ReplyGate>(_mbus))
{ }
IntermediateSession::~IntermediateSession()
{
_gate->close();
close();
- _gate->subRef();
}
void
diff --git a/messagebus/src/vespa/messagebus/intermediatesession.h b/messagebus/src/vespa/messagebus/intermediatesession.h
index 0a17aa1e42a..8d938b6cb9e 100644
--- a/messagebus/src/vespa/messagebus/intermediatesession.h
+++ b/messagebus/src/vespa/messagebus/intermediatesession.h
@@ -4,11 +4,11 @@
#include "reply.h"
#include "imessagehandler.h"
#include "intermediatesessionparams.h"
+#include "replygate.h"
namespace mbus {
class MessageBus;
-class ReplyGate;
class Message;
/**
@@ -21,12 +21,13 @@ class IntermediateSession : public IMessageHandler,
private:
friend class MessageBus;
using MessageUP = std::unique_ptr<Message>;
+ template <typename T> using ref_counted = vespalib::ref_counted<T>;
- MessageBus &_mbus;
- string _name;
- IMessageHandler &_msgHandler;
- IReplyHandler &_replyHandler;
- ReplyGate *_gate;
+ MessageBus &_mbus;
+ string _name;
+ IMessageHandler &_msgHandler;
+ IReplyHandler &_replyHandler;
+ ref_counted<ReplyGate> _gate;
/**
* This constructor is declared package private since only MessageBus is supposed to instantiate it.
diff --git a/messagebus/src/vespa/messagebus/replygate.cpp b/messagebus/src/vespa/messagebus/replygate.cpp
index 8094119f14c..1028d46aff4 100644
--- a/messagebus/src/vespa/messagebus/replygate.cpp
+++ b/messagebus/src/vespa/messagebus/replygate.cpp
@@ -6,7 +6,6 @@
namespace mbus {
ReplyGate::ReplyGate(IMessageHandler &sender) :
- vespalib::ReferenceCounter(),
_sender(sender),
_open(true)
{ }
@@ -14,7 +13,7 @@ ReplyGate::ReplyGate(IMessageHandler &sender) :
void
ReplyGate::handleMessage(Message::UP msg)
{
- addRef();
+ internal_addref();
msg->pushHandler(*this, *this);
_sender.handleMessage(std::move(msg));
}
@@ -34,13 +33,13 @@ ReplyGate::handleReply(Reply::UP reply)
} else {
reply->discard();
}
- subRef();
+ internal_subref();
}
void
ReplyGate::handleDiscard(Context)
{
- subRef();
+ internal_subref();
}
} // namespace mbus
diff --git a/messagebus/src/vespa/messagebus/replygate.h b/messagebus/src/vespa/messagebus/replygate.h
index 0c487de3ecf..e09dfe63365 100644
--- a/messagebus/src/vespa/messagebus/replygate.h
+++ b/messagebus/src/vespa/messagebus/replygate.h
@@ -5,7 +5,7 @@
#include "idiscardhandler.h"
#include "imessagehandler.h"
#include "ireplyhandler.h"
-#include <vespa/vespalib/util/referencecounter.h>
+#include <vespa/vespalib/util/ref_counted.h>
#include <atomic>
namespace mbus {
@@ -20,7 +20,7 @@ namespace mbus {
* is handled outside this class. Note that this class is only intended for
* internal use.
*/
-class ReplyGate : public vespalib::ReferenceCounter,
+class ReplyGate : public vespalib::enable_ref_counted,
public IDiscardHandler,
public IMessageHandler,
public IReplyHandler
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index 0691e0c07f9..feff3beed58 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -6,15 +6,17 @@
#include "tracelevel.h"
#include <vespa/messagebus/routing/routingtable.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <cassert>
using vespalib::make_string;
+using vespalib::make_ref_counted;
namespace mbus {
SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams &params)
: _lock(),
_mbus(mbus),
- _gate(new ReplyGate(_mbus)),
+ _gate(make_ref_counted<ReplyGate>(_mbus)),
_sequencer(*_gate),
_replyHandler(params.getReplyHandler()),
_throttlePolicy(params.getThrottlePolicy()),
@@ -31,9 +33,6 @@ SourceSession::~SourceSession()
// Ensure that no more replies propagate from mbus.
_gate->close();
_mbus.sync();
-
- // Tell gate that we will no longer use it.
- _gate->subRef();
}
Result
diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h
index d918724ad4f..03628f06c56 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.h
+++ b/messagebus/src/vespa/messagebus/sourcesession.h
@@ -5,13 +5,13 @@
#include "result.h"
#include "sequencer.h"
#include "sourcesessionparams.h"
+#include "replygate.h"
#include <atomic>
#include <condition_variable>
namespace mbus {
class MessageBus;
-class ReplyGate;
/**
* A SourceSession is used to send Message objects along a named or explicitly defined route and get Reply
@@ -21,11 +21,12 @@ class ReplyGate;
class SourceSession : public IReplyHandler {
private:
friend class MessageBus;
+ template <typename T> using ref_counted = vespalib::ref_counted<T>;
std::mutex _lock;
std::condition_variable _cond;
MessageBus &_mbus;
- ReplyGate *_gate;
+ ref_counted<ReplyGate> _gate;
Sequencer _sequencer;
IReplyHandler &_replyHandler;
IThrottlePolicy::SP _throttlePolicy;