aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-11-29 15:41:04 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-11-29 16:16:44 +0000
commit766e14cc7a3fea7dfd5bbc9b549c2bfd9a838efe (patch)
treea7f2002d6b41b0f8528858289acf0efb9c315ad7
parent7fbfecba694f1665d5c00bd504086a49898f8bb2 (diff)
Defer MBus DestinationSession registration during content node init
Creating a `DestinationSession` that is immediately registered as available for business means we may theoretically start receiving messages over the session even before the call returns to the caller. Either way there would be no memory barrier that ensures that `_messageBusSession` would be fully visible to the MessageBus threads (since it's written after return). To avoid this sneaky scenario, defer registration (and thus introduce a barrier) until _after_ we've initialized our internal member variables. This addresses a TSan warning.
-rw-r--r--messagebus/src/vespa/messagebus/destinationsession.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/destinationsession.h13
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/destinationsessionparams.h8
-rw-r--r--messagebus/src/vespa/messagebus/imessagehandler.h2
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp19
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h6
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp9
8 files changed, 68 insertions, 9 deletions
diff --git a/messagebus/src/vespa/messagebus/destinationsession.cpp b/messagebus/src/vespa/messagebus/destinationsession.cpp
index 661873e5678..e9431407e99 100644
--- a/messagebus/src/vespa/messagebus/destinationsession.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsession.cpp
@@ -2,13 +2,16 @@
#include "destinationsession.h"
#include "messagebus.h"
#include "emptyreply.h"
+#include <cassert>
namespace mbus {
DestinationSession::DestinationSession(MessageBus &mbus, const DestinationSessionParams &params) :
_mbus(mbus),
_name(params.getName()),
- _msgHandler(params.getMessageHandler())
+ _msgHandler(params.getMessageHandler()),
+ _session_registered(!params.defer_registration()),
+ _broadcast_name(params.getBroadcastName())
{ }
DestinationSession::~DestinationSession() {
@@ -16,9 +19,19 @@ DestinationSession::~DestinationSession() {
}
void
+DestinationSession::register_session_deferred() {
+ assert(!_session_registered);
+ _mbus.register_session(*this, _name, _broadcast_name);
+ _session_registered = true;
+}
+
+void
DestinationSession::close() {
- _mbus.unregisterSession(_name);
- _mbus.sync();
+ if (_session_registered) {
+ _mbus.unregisterSession(_name);
+ _mbus.sync();
+ _session_registered = false;
+ }
}
void
diff --git a/messagebus/src/vespa/messagebus/destinationsession.h b/messagebus/src/vespa/messagebus/destinationsession.h
index 138e00407e8..bb7fd612e41 100644
--- a/messagebus/src/vespa/messagebus/destinationsession.h
+++ b/messagebus/src/vespa/messagebus/destinationsession.h
@@ -22,6 +22,8 @@ private:
MessageBus &_mbus;
string _name;
IMessageHandler &_msgHandler;
+ bool _session_registered;
+ bool _broadcast_name;
/**
* This constructor is package private since only MessageBus is supposed to
@@ -36,13 +38,20 @@ public:
/**
* Convenience typedef for an auto pointer to a DestinationSession object.
*/
- typedef std::unique_ptr<DestinationSession> UP;
+ using UP = std::unique_ptr<DestinationSession>;
/**
* The destructor untangles from messagebus. After this method returns,
* messagebus will not invoke any handlers associated with this session.
*/
- virtual ~DestinationSession();
+ ~DestinationSession() override;
+
+ /**
+ * If a session was created with defer_registered(true) as part of its parameters,
+ * it can be subsequently registered at most once. The session will not be visible
+ * for incoming messages until that point in time.
+ */
+ void register_session_deferred();
/**
* This method unregisters this session from message bus, effectively
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
index a830e04e243..8d386998f61 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.cpp
@@ -6,6 +6,7 @@ namespace mbus {
DestinationSessionParams::DestinationSessionParams() :
_name("destination"),
_broadcastName(true),
+ _defer_registration(false),
_handler(nullptr)
{ }
diff --git a/messagebus/src/vespa/messagebus/destinationsessionparams.h b/messagebus/src/vespa/messagebus/destinationsessionparams.h
index fbd392a011c..066743dea55 100644
--- a/messagebus/src/vespa/messagebus/destinationsessionparams.h
+++ b/messagebus/src/vespa/messagebus/destinationsessionparams.h
@@ -18,6 +18,7 @@ class DestinationSessionParams {
private:
string _name;
bool _broadcastName;
+ bool _defer_registration;
IMessageHandler *_handler;
public:
@@ -48,6 +49,8 @@ public:
*/
bool getBroadcastName() const { return _broadcastName; }
+ [[nodiscard]] bool defer_registration() const noexcept { return _defer_registration; }
+
/**
* Sets whether or not to broadcast the name of this session on the network.
*
@@ -56,6 +59,11 @@ public:
*/
DestinationSessionParams &setBroadcastName(bool broadcastName) { _broadcastName = broadcastName; return *this; }
+ DestinationSessionParams& defer_registration(bool defer) noexcept {
+ _defer_registration = defer;
+ return *this;
+ }
+
/**
* Returns the handler to receive incoming messages. If you call this method without first assigning a
* message handler to this object, you wil de-ref null.
diff --git a/messagebus/src/vespa/messagebus/imessagehandler.h b/messagebus/src/vespa/messagebus/imessagehandler.h
index ee15f96b22b..516a00cd07d 100644
--- a/messagebus/src/vespa/messagebus/imessagehandler.h
+++ b/messagebus/src/vespa/messagebus/imessagehandler.h
@@ -20,7 +20,7 @@ protected:
public:
IMessageHandler(const IMessageHandler &) = delete;
IMessageHandler & operator = (const IMessageHandler &) = delete;
- virtual ~IMessageHandler() {}
+ virtual ~IMessageHandler() = default;
/**
* This method is invoked by messagebus to deliver a Message.
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index 97be5955e9d..49ea955754c 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -225,14 +225,27 @@ MessageBus::createDestinationSession(const DestinationSessionParams &params)
{
std::lock_guard guard(_lock);
DestinationSession::UP ret(new DestinationSession(*this, params));
- _sessions[params.getName()] = ret.get();
- if (params.getBroadcastName()) {
- _network.registerSession(params.getName());
+ if (!params.defer_registration()) {
+ _sessions[params.getName()] = ret.get();
+ if (params.getBroadcastName()) {
+ _network.registerSession(params.getName());
+ }
}
return ret;
}
void
+MessageBus::register_session(IMessageHandler& session, const string& session_name, bool broadcast_name)
+{
+ std::lock_guard guard(_lock);
+ assert(!_sessions.contains(session_name));
+ _sessions[session_name] = &session;
+ if (broadcast_name) {
+ _network.registerSession(session_name);
+ }
+}
+
+void
MessageBus::unregisterSession(const string &sessionName)
{
std::lock_guard guard(_lock);
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index d270a0f3491..2c27d281494 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -171,6 +171,12 @@ public:
DestinationSession::UP createDestinationSession(const DestinationSessionParams &params);
/**
+ * Register a session; used by session instances that are created with deferred registration.
+ * Don't use this directly.
+ */
+ void register_session(IMessageHandler& session, const string& session_name, bool broadcast_name);
+
+ /**
* Unregister a session. This method is invoked by session destructors to ensure that no more Message objects are
* delivered and that the session name is removed from the network naming service. The sync method can be invoked
* after invoking this one to ensure that no callbacks are active.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 621f8a2ecc0..4458bf8a92d 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -396,6 +396,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
mbus::DestinationSessionParams dstParams;
dstParams.setName("default");
dstParams.setBroadcastName(true);
+ dstParams.defer_registration(true); // Deferred session registration; see rationale below
dstParams.setMessageHandler(*this);
_messageBusSession = _mbus->getMessageBus().createDestinationSession(dstParams);
@@ -403,6 +404,14 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig>
srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
srcParams.setReplyHandler(*this);
_sourceSession = _mbus->getMessageBus().createSourceSession(srcParams);
+
+ // Creating a DestinationSession that is immediately registered as available for business
+ // means we may theoretically start receiving messages over the session even before the call returns
+ // to the caller. Either way there would be no memory barrier that ensures that _messageBusSession
+ // would be fully visible to the MessageBus threads (since it's written after return).
+ // To avoid this sneaky scenario, defer registration (and thus introduce a barrier) until
+ // _after_ we've initialized our internal member variables.
+ _messageBusSession->register_session_deferred();
}
}