summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/destinationsession.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/destinationsession.h13
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.h8
-rw-r--r--messagebus/src/vespa/messagebus/imessagehandler.h2
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h6
7 files changed, 59 insertions, 9 deletions
diff --git a/messagebus/src/vespa/messagebus/destinationsession.cpp b/messagebus/src/vespa/messagebus/destinationsession.cpp
index 661873e5678..e9431407e99 100644
--- a/messagebus/src/vespa/messagebus/destinationsession.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsession.cpp
@@ -2,13 +2,16 @@
#include "destinationsession.h"
#include "messagebus.h"
#include "emptyreply.h"
+#include <cassert>
namespace mbus {
DestinationSession::DestinationSession(MessageBus &mbus, const DestinationSessionParams &params) :
_mbus(mbus),
_name(params.getName()),
- _msgHandler(params.getMessageHandler())
+ _msgHandler(params.getMessageHandler()),
+ _session_registered(!params.defer_registration()),
+ _broadcast_name(params.getBroadcastName())
{ }
DestinationSession::~DestinationSession() {
@@ -16,9 +19,19 @@ DestinationSession::~DestinationSession() {
}
void
+DestinationSession::register_session_deferred() {
+ assert(!_session_registered);
+ _mbus.register_session(*this, _name, _broadcast_name);
+ _session_registered = true;
+}
+
+void
DestinationSession::close() {
- _mbus.unregisterSession(_name);
- _mbus.sync();
+ if (_session_registered) {
+ _mbus.unregisterSession(_name);
+ _mbus.sync();
+ _session_registered = false;
+ }
}
void
diff --git a/messagebus/src/vespa/messagebus/destinationsession.h b/messagebus/src/vespa/messagebus/destinationsession.h
index 138e00407e8..bb7fd612e41 100644
--- a/messagebus/src/vespa/messagebus/destinationsession.h
+++ b/messagebus/src/vespa/messagebus/destinationsession.h
@@ -22,6 +22,8 @@ private:
MessageBus &_mbus;
string _name;
IMessageHandler &_msgHandler;
+ bool _session_registered;
+ bool _broadcast_name;
/**
* This constructor is package private since only MessageBus is supposed to
@@ -36,13 +38,20 @@ public:
/**
* Convenience typedef for an auto pointer to a DestinationSession object.
*/
- typedef std::unique_ptr<DestinationSession> UP;
+ using UP = std::unique_ptr<DestinationSession>;
/**
* The destructor untangles from messagebus. After this method returns,
* messagebus will not invoke any handlers associated with this session.
*/
- virtual ~DestinationSession();
+ ~DestinationSession() override;
+
+ /**
+ * If a session was created with defer_registered(true) as part of its parameters,
+ * it can be subsequently registered at most once. The session will not be visible
+ * for incoming messages until that point in time.
+ */
+ void register_session_deferred();
/**
* This method unregisters this session from message bus, effectively
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
index a830e04e243..8d386998f61 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
@@ -6,6 +6,7 @@ namespace mbus {
DestinationSessionParams::DestinationSessionParams() :
_name("destination"),
_broadcastName(true),
+ _defer_registration(false),
_handler(nullptr)
{ }
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.h b/messagebus/src/vespa/messagebus/destinationsessionparams.h
index fbd392a011c..066743dea55 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.h
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.h
@@ -18,6 +18,7 @@ class DestinationSessionParams {
private:
string _name;
bool _broadcastName;
+ bool _defer_registration;
IMessageHandler *_handler;
public:
@@ -48,6 +49,8 @@ public:
*/
bool getBroadcastName() const { return _broadcastName; }
+ [[nodiscard]] bool defer_registration() const noexcept { return _defer_registration; }
+
/**
* Sets whether or not to broadcast the name of this session on the network.
*
@@ -56,6 +59,11 @@ public:
*/
DestinationSessionParams &setBroadcastName(bool broadcastName) { _broadcastName = broadcastName; return *this; }
+ DestinationSessionParams& defer_registration(bool defer) noexcept {
+ _defer_registration = defer;
+ return *this;
+ }
+
/**
* Returns the handler to receive incoming messages. If you call this method without first assigning a
* message handler to this object, you wil de-ref null.
diff --git a/messagebus/src/vespa/messagebus/imessagehandler.h b/messagebus/src/vespa/messagebus/imessagehandler.h
index ee15f96b22b..516a00cd07d 100644
--- a/messagebus/src/vespa/messagebus/imessagehandler.h
+++ b/messagebus/src/vespa/messagebus/imessagehandler.h
@@ -20,7 +20,7 @@ protected:
public:
IMessageHandler(const IMessageHandler &) = delete;
IMessageHandler & operator = (const IMessageHandler &) = delete;
- virtual ~IMessageHandler() {}
+ virtual ~IMessageHandler() = default;
/**
* This method is invoked by messagebus to deliver a Message.
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index 97be5955e9d..49ea955754c 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -225,14 +225,27 @@ MessageBus::createDestinationSession(const DestinationSessionParams &params)
{
std::lock_guard guard(_lock);
DestinationSession::UP ret(new DestinationSession(*this, params));
- _sessions[params.getName()] = ret.get();
- if (params.getBroadcastName()) {
- _network.registerSession(params.getName());
+ if (!params.defer_registration()) {
+ _sessions[params.getName()] = ret.get();
+ if (params.getBroadcastName()) {
+ _network.registerSession(params.getName());
+ }
}
return ret;
}
void
+MessageBus::register_session(IMessageHandler& session, const string& session_name, bool broadcast_name)
+{
+ std::lock_guard guard(_lock);
+ assert(!_sessions.contains(session_name));
+ _sessions[session_name] = &session;
+ if (broadcast_name) {
+ _network.registerSession(session_name);
+ }
+}
+
+void
MessageBus::unregisterSession(const string &sessionName)
{
std::lock_guard guard(_lock);
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index d270a0f3491..2c27d281494 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -171,6 +171,12 @@ public:
DestinationSession::UP createDestinationSession(const DestinationSessionParams &params);
/**
+ * Register a session; used by session instances that are created with deferred registration.
+ * Don't use this directly.
+ */
+ void register_session(IMessageHandler& session, const string& session_name, bool broadcast_name);
+
+ /**
* Unregister a session. This method is invoked by session destructors to ensure that no more Message objects are
* delivered and that the session name is removed from the network naming service. The sync method can be invoked
* after invoking this one to ensure that no callbacks are active.