summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2019-01-08 14:57:39 +0000
committerTor Brede Vekterli <vekterli@oath.com>2019-01-08 15:00:23 +0000
commit90007d5216499cd44a7a02ba0fcc746749b3aa3f (patch)
tree29a221a8dddc13039e1bebfedd871e0d0a9ec234 /messagebus
parentbafc19bf1b7cf39b2c1293949de648b53ebd460c (diff)
Use atomics for lockfree protocol lookups in messagebus
Use release-acquire pairs to ensure visibility of protocol object contents when their pointer is observable by a reader.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.cpp24
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.h7
2 files changed, 20 insertions, 11 deletions
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp
index 4e2efcfb3b9..a9891069c44 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.cpp
+++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp
@@ -7,7 +7,7 @@ LOG_SETUP(".protocolrepository");
namespace mbus {
ProtocolRepository::ProtocolRepository() : _numProtocols(0) {}
-ProtocolRepository::~ProtocolRepository() {}
+ProtocolRepository::~ProtocolRepository() = default;
void
ProtocolRepository::clearPolicyCache()
@@ -20,22 +20,27 @@ IProtocol::SP
ProtocolRepository::putProtocol(const IProtocol::SP & protocol)
{
const string &name = protocol->getName();
- size_t protocolIndex = _numProtocols;
- for (size_t i(0); i < _numProtocols; i++) {
+ const auto numProtocols = _numProtocols.load();
+ size_t protocolIndex = numProtocols;
+ for (size_t i(0); i < numProtocols; i++) {
if (_protocols[i].first == name) {
protocolIndex = i;
break;
}
}
- if (protocolIndex == _numProtocols) {
- assert(_numProtocols < MAX_PROTOCOLS);
+ if (protocolIndex == numProtocols) {
+ assert(numProtocols < MAX_PROTOCOLS);
_protocols[protocolIndex].first = name;
_protocols[protocolIndex].second = nullptr;
- _numProtocols++;
+ // nullptr may be observed after increment but before protocol pointer
+ // update; this is fine as it has the same behavior as if the protocol
+ // has not yet been added.
+ const auto beforeAdd = _numProtocols.fetch_add(1, std::memory_order_release);
+ assert(beforeAdd == numProtocols); // Sanity check for racing inserters
} else {
clearPolicyCache();
}
- _protocols[protocolIndex].second = protocol.get();
+ _protocols[protocolIndex].second.store(protocol.get(), std::memory_order_release);
IProtocol::SP prev = _activeProtocols[name];
_activeProtocols[name] = protocol;
return prev;
@@ -44,9 +49,10 @@ ProtocolRepository::putProtocol(const IProtocol::SP & protocol)
IProtocol *
ProtocolRepository::getProtocol(const string &name)
{
- for (size_t i(0); i < _numProtocols; i++) {
+ const auto numProtocols = _numProtocols.load(std::memory_order_acquire);
+ for (size_t i(0); i < numProtocols; i++) {
if (_protocols[i].first == name) {
- return _protocols[i].second;
+ return _protocols[i].second.load(std::memory_order_acquire);
}
}
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.h b/messagebus/src/vespa/messagebus/protocolrepository.h
index d04fd361f3a..b310ba3586a 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.h
+++ b/messagebus/src/vespa/messagebus/protocolrepository.h
@@ -4,6 +4,7 @@
#include "iprotocol.h"
#include <vespa/vespalib/util/sync.h>
#include <map>
+#include <atomic>
namespace mbus {
@@ -21,8 +22,8 @@ private:
vespalib::Lock _lock; // Only guards the cache,
// not the protocols as they are set up during messagebus construction.
static constexpr size_t MAX_PROTOCOLS = 16;
- std::pair<string, IProtocol *> _protocols[MAX_PROTOCOLS];
- size_t _numProtocols;
+ std::pair<string, std::atomic<IProtocol *>> _protocols[MAX_PROTOCOLS];
+ std::atomic<size_t> _numProtocols;
ProtocolMap _activeProtocols;
RoutingPolicyCache _routingPolicyCache;
@@ -37,6 +38,8 @@ public:
* routing policy cache. You must keep the old protocol returned until there can be no usages of the references
* acquired from getProtocol.
*
+ * Must not be called concurrently by multiple threads.
+ *
* @param protocol The protocol to register.
* @return The previous protocol registered under this name.
*/