From 90007d5216499cd44a7a02ba0fcc746749b3aa3f Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 8 Jan 2019 14:57:39 +0000 Subject: Use atomics for lockfree protocol lookups in messagebus Use release-acquire pairs to ensure visibility of protocol object contents when their pointer is observable by a reader. --- .../src/vespa/messagebus/protocolrepository.cpp | 24 ++++++++++++++-------- .../src/vespa/messagebus/protocolrepository.h | 7 +++++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp index 4e2efcfb3b9..a9891069c44 100644 --- a/messagebus/src/vespa/messagebus/protocolrepository.cpp +++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp @@ -7,7 +7,7 @@ LOG_SETUP(".protocolrepository"); namespace mbus { ProtocolRepository::ProtocolRepository() : _numProtocols(0) {} -ProtocolRepository::~ProtocolRepository() {} +ProtocolRepository::~ProtocolRepository() = default; void ProtocolRepository::clearPolicyCache() @@ -20,22 +20,27 @@ IProtocol::SP ProtocolRepository::putProtocol(const IProtocol::SP & protocol) { const string &name = protocol->getName(); - size_t protocolIndex = _numProtocols; - for (size_t i(0); i < _numProtocols; i++) { + const auto numProtocols = _numProtocols.load(); + size_t protocolIndex = numProtocols; + for (size_t i(0); i < numProtocols; i++) { if (_protocols[i].first == name) { protocolIndex = i; break; } } - if (protocolIndex == _numProtocols) { - assert(_numProtocols < MAX_PROTOCOLS); + if (protocolIndex == numProtocols) { + assert(numProtocols < MAX_PROTOCOLS); _protocols[protocolIndex].first = name; _protocols[protocolIndex].second = nullptr; - _numProtocols++; + // nullptr may be observed after increment but before protocol pointer + // update; this is fine as it has the same behavior as if the protocol + // has not yet been added. + const auto beforeAdd = _numProtocols.fetch_add(1, std::memory_order_release); + assert(beforeAdd == numProtocols); // Sanity check for racing inserters } else { clearPolicyCache(); } - _protocols[protocolIndex].second = protocol.get(); + _protocols[protocolIndex].second.store(protocol.get(), std::memory_order_release); IProtocol::SP prev = _activeProtocols[name]; _activeProtocols[name] = protocol; return prev; @@ -44,9 +49,10 @@ ProtocolRepository::putProtocol(const IProtocol::SP & protocol) IProtocol * ProtocolRepository::getProtocol(const string &name) { - for (size_t i(0); i < _numProtocols; i++) { + const auto numProtocols = _numProtocols.load(std::memory_order_acquire); + for (size_t i(0); i < numProtocols; i++) { if (_protocols[i].first == name) { - return _protocols[i].second; + return _protocols[i].second.load(std::memory_order_acquire); } } diff --git a/messagebus/src/vespa/messagebus/protocolrepository.h b/messagebus/src/vespa/messagebus/protocolrepository.h index d04fd361f3a..b310ba3586a 100644 --- a/messagebus/src/vespa/messagebus/protocolrepository.h +++ b/messagebus/src/vespa/messagebus/protocolrepository.h @@ -4,6 +4,7 @@ #include "iprotocol.h" #include #include +#include namespace mbus { @@ -21,8 +22,8 @@ private: vespalib::Lock _lock; // Only guards the cache, // not the protocols as they are set up during messagebus construction. static constexpr size_t MAX_PROTOCOLS = 16; - std::pair _protocols[MAX_PROTOCOLS]; - size_t _numProtocols; + std::pair> _protocols[MAX_PROTOCOLS]; + std::atomic _numProtocols; ProtocolMap _activeProtocols; RoutingPolicyCache _routingPolicyCache; @@ -37,6 +38,8 @@ public: * routing policy cache. You must keep the old protocol returned until there can be no usages of the references * acquired from getProtocol. * + * Must not be called concurrently by multiple threads. + * * @param protocol The protocol to register. * @return The previous protocol registered under this name. */ -- cgit v1.2.3