summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-07-25 09:06:46 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-07-25 09:17:47 +0200
commit488d20ae17b0c99f5ed855d8070f8fb8d143999c (patch)
tree4b7921098684be18a4b192b7f83397ba59434b5a /messagebus
parent6535f64c263804a888716c655f0c98e248c870b0 (diff)
Ensure that the structures we are updating are safe for multithreaded access.
While the Protocols themselves must be lifetime managed on the outside. This is done in the CommunicationManager.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.cpp39
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.h15
2 files changed, 34 insertions, 20 deletions
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp
index abad6534331..61b08384f79 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.cpp
+++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp
@@ -6,7 +6,7 @@ LOG_SETUP(".protocolrepository");
namespace mbus {
-ProtocolRepository::ProtocolRepository() {}
+ProtocolRepository::ProtocolRepository() : _numProtocols(0) {}
ProtocolRepository::~ProtocolRepository() {}
void
@@ -20,27 +20,36 @@ IProtocol::SP
ProtocolRepository::putProtocol(const IProtocol::SP & protocol)
{
const string &name = protocol->getName();
- if (_protocols.find(name) != _protocols.end()) {
+ size_t protocolIndex = _numProtocols;
+ for (size_t i(0); i < _numProtocols; i++) {
+ if (_protocols[i].first == name) {
+ protocolIndex = i;
+ break;
+ }
+ }
+ if (protocolIndex == _numProtocols) {
+ assert(_numProtocols < MAX_PROTOCOLS);
+ _protocols[protocolIndex].first = name;
+ _protocols[protocolIndex].first = nullptr;
+ _numProtocols++;
+ } else {
clearPolicyCache();
}
- IProtocol::SP prev = _protocols[name];
- _protocols[name] = protocol;
+ _protocols[protocolIndex].second = protocol.get();
+ IProtocol::SP prev = _activeProtocols[name];
+ _activeProtocols[name] = protocol;
return prev;
}
-bool
-ProtocolRepository::hasProtocol(const string &name) const
-{
- return _protocols.find(name) != _protocols.end();
-}
-
IProtocol *
ProtocolRepository::getProtocol(const string &name)
{
- ProtocolMap::iterator it = _protocols.find(name);
- if (it != _protocols.end()) {
- return it->second.get();
+ for (size_t i(0); i < _numProtocols; i++) {
+ if (_protocols[i].first == name) {
+ return _protocols[i].second;
+ }
}
+
return nullptr;
}
@@ -56,8 +65,8 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName,
if (cit != _routingPolicyCache.end()) {
return cit->second;
}
- ProtocolMap::iterator pit = _protocols.find(protocolName);
- if (pit == _protocols.end()) {
+ ProtocolMap::iterator pit = _activeProtocols.find(protocolName);
+ if (pit == _activeProtocols.end()) {
LOG(error, "Protocol '%s' not supported.", protocolName.c_str());
return IRoutingPolicy::SP();
}
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.h b/messagebus/src/vespa/messagebus/protocolrepository.h
index 341c7f8bc2d..52cd31364c1 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.h
+++ b/messagebus/src/vespa/messagebus/protocolrepository.h
@@ -10,15 +10,20 @@ namespace mbus {
/**
* Implements a thread-safe repository for protocols and their routing policies. This manages an internal cache of
* routing policies so that similarly referenced policy directives share the same instance of a policy.
+ * However for speed the protocols themselves must be kept alive on the outside when returned from
+ * putProtocol. There is only room for a limited number of protocols.
*/
class ProtocolRepository {
private:
- typedef std::map<string, IProtocol::SP> ProtocolMap;
- typedef std::map<string, IRoutingPolicy::SP> RoutingPolicyCache;
+ using ProtocolMap = std::map<string, IProtocol::SP>;
+ using RoutingPolicyCache = std::map<string, IRoutingPolicy::SP>;
vespalib::Lock _lock; // Only guards the cache,
// not the protocols as they are set up during messagebus construction.
- ProtocolMap _protocols;
+ static constexpr size_t MAX_PROTOCOLS = 16;
+ std::pair<string, IProtocol *> _protocols[MAX_PROTOCOLS];
+ size_t _numProtocols;
+ ProtocolMap _activeProtocols;
RoutingPolicyCache _routingPolicyCache;
public:
@@ -29,7 +34,8 @@ public:
/**
* Registers a protocol with this repository. This will overwrite any protocol that was registered earlier
* that has the same name. If this method detects a protocol replacement, it will clear its internal
- * routing policy cache.
+ * routing policy cache. You must keep the old protocol returned until there can be no usages of the references
+ * acquired from getProtocol.
*
* @param protocol The protocol to register.
* @return The previous protocol registered under this name.
@@ -77,4 +83,3 @@ public:
};
}
-