diff options
8 files changed, 68 insertions, 9 deletions
diff --git a/messagebus/src/vespa/messagebus/destinationsession.cpp b/messagebus/src/vespa/messagebus/destinationsession.cpp index 661873e5678..e9431407e99 100644 --- a/messagebus/src/vespa/messagebus/destinationsession.cpp +++ b/messagebus/src/vespa/messagebus/destinationsession.cpp @@ -2,13 +2,16 @@ #include "destinationsession.h" #include "messagebus.h" #include "emptyreply.h" +#include <cassert> namespace mbus { DestinationSession::DestinationSession(MessageBus &mbus, const DestinationSessionParams ¶ms) : _mbus(mbus), _name(params.getName()), - _msgHandler(params.getMessageHandler()) + _msgHandler(params.getMessageHandler()), + _session_registered(!params.defer_registration()), + _broadcast_name(params.getBroadcastName()) { } DestinationSession::~DestinationSession() { @@ -16,9 +19,19 @@ DestinationSession::~DestinationSession() { } void +DestinationSession::register_session_deferred() { + assert(!_session_registered); + _mbus.register_session(*this, _name, _broadcast_name); + _session_registered = true; +} + +void DestinationSession::close() { - _mbus.unregisterSession(_name); - _mbus.sync(); + if (_session_registered) { + _mbus.unregisterSession(_name); + _mbus.sync(); + _session_registered = false; + } } void diff --git a/messagebus/src/vespa/messagebus/destinationsession.h b/messagebus/src/vespa/messagebus/destinationsession.h index 138e00407e8..bb7fd612e41 100644 --- a/messagebus/src/vespa/messagebus/destinationsession.h +++ b/messagebus/src/vespa/messagebus/destinationsession.h @@ -22,6 +22,8 @@ private: MessageBus &_mbus; string _name; IMessageHandler &_msgHandler; + bool _session_registered; + bool _broadcast_name; /** * This constructor is package private since only MessageBus is supposed to @@ -36,13 +38,20 @@ public: /** * Convenience typedef for an auto pointer to a DestinationSession object. */ - typedef std::unique_ptr<DestinationSession> UP; + using UP = std::unique_ptr<DestinationSession>; /** * The destructor untangles from messagebus. After this method returns, * messagebus will not invoke any handlers associated with this session. */ - virtual ~DestinationSession(); + ~DestinationSession() override; + + /** + * If a session was created with defer_registered(true) as part of its parameters, + * it can be subsequently registered at most once. The session will not be visible + * for incoming messages until that point in time. + */ + void register_session_deferred(); /** * This method unregisters this session from message bus, effectively diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp index a830e04e243..8d386998f61 100644 --- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp +++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp @@ -6,6 +6,7 @@ namespace mbus { DestinationSessionParams::DestinationSessionParams() : _name("destination"), _broadcastName(true), + _defer_registration(false), _handler(nullptr) { } diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.h b/messagebus/src/vespa/messagebus/destinationsessionparams.h index fbd392a011c..066743dea55 100644 --- a/messagebus/src/vespa/messagebus/destinationsessionparams.h +++ b/messagebus/src/vespa/messagebus/destinationsessionparams.h @@ -18,6 +18,7 @@ class DestinationSessionParams { private: string _name; bool _broadcastName; + bool _defer_registration; IMessageHandler *_handler; public: @@ -48,6 +49,8 @@ public: */ bool getBroadcastName() const { return _broadcastName; } + [[nodiscard]] bool defer_registration() const noexcept { return _defer_registration; } + /** * Sets whether or not to broadcast the name of this session on the network. * @@ -56,6 +59,11 @@ public: */ DestinationSessionParams &setBroadcastName(bool broadcastName) { _broadcastName = broadcastName; return *this; } + DestinationSessionParams& defer_registration(bool defer) noexcept { + _defer_registration = defer; + return *this; + } + /** * Returns the handler to receive incoming messages. If you call this method without first assigning a * message handler to this object, you wil de-ref null. diff --git a/messagebus/src/vespa/messagebus/imessagehandler.h b/messagebus/src/vespa/messagebus/imessagehandler.h index ee15f96b22b..516a00cd07d 100644 --- a/messagebus/src/vespa/messagebus/imessagehandler.h +++ b/messagebus/src/vespa/messagebus/imessagehandler.h @@ -20,7 +20,7 @@ protected: public: IMessageHandler(const IMessageHandler &) = delete; IMessageHandler & operator = (const IMessageHandler &) = delete; - virtual ~IMessageHandler() {} + virtual ~IMessageHandler() = default; /** * This method is invoked by messagebus to deliver a Message. diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index 97be5955e9d..49ea955754c 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -225,14 +225,27 @@ MessageBus::createDestinationSession(const DestinationSessionParams ¶ms) { std::lock_guard guard(_lock); DestinationSession::UP ret(new DestinationSession(*this, params)); - _sessions[params.getName()] = ret.get(); - if (params.getBroadcastName()) { - _network.registerSession(params.getName()); + if (!params.defer_registration()) { + _sessions[params.getName()] = ret.get(); + if (params.getBroadcastName()) { + _network.registerSession(params.getName()); + } } return ret; } void +MessageBus::register_session(IMessageHandler& session, const string& session_name, bool broadcast_name) +{ + std::lock_guard guard(_lock); + assert(!_sessions.contains(session_name)); + _sessions[session_name] = &session; + if (broadcast_name) { + _network.registerSession(session_name); + } +} + +void MessageBus::unregisterSession(const string &sessionName) { std::lock_guard guard(_lock); diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index d270a0f3491..2c27d281494 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -171,6 +171,12 @@ public: DestinationSession::UP createDestinationSession(const DestinationSessionParams ¶ms); /** + * Register a session; used by session instances that are created with deferred registration. + * Don't use this directly. + */ + void register_session(IMessageHandler& session, const string& session_name, bool broadcast_name); + + /** * Unregister a session. This method is invoked by session destructors to ensure that no more Message objects are * delivered and that the session name is removed from the network naming service. The sync method can be invoked * after invoking this one to ensure that no callbacks are active. diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 621f8a2ecc0..4458bf8a92d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -396,6 +396,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> mbus::DestinationSessionParams dstParams; dstParams.setName("default"); dstParams.setBroadcastName(true); + dstParams.defer_registration(true); // Deferred session registration; see rationale below dstParams.setMessageHandler(*this); _messageBusSession = _mbus->getMessageBus().createDestinationSession(dstParams); @@ -403,6 +404,14 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); srcParams.setReplyHandler(*this); _sourceSession = _mbus->getMessageBus().createSourceSession(srcParams); + + // Creating a DestinationSession that is immediately registered as available for business + // means we may theoretically start receiving messages over the session even before the call returns + // to the caller. Either way there would be no memory barrier that ensures that _messageBusSession + // would be fully visible to the MessageBus threads (since it's written after return). + // To avoid this sneaky scenario, defer registration (and thus introduce a barrier) until + // _after_ we've initialized our internal member variables. + _messageBusSession->register_session_deferred(); } } |