summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-08 08:24:55 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-08 08:24:55 +0000
commit3b3ea0cf9c580e658abab59d1d020d9db9ec9cb9 (patch)
tree6d6b9b74ef716f09b17aaf24ca63760578837843 /messagebus
parent4291a9ec03e9b5b7c23473725678b3c0785cbf91 (diff)
Use vespalib::Lock -> std::mutex
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.cpp13
-rw-r--r--messagebus/src/vespa/messagebus/messagebus.h3
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.cpp2
-rw-r--r--messagebus/src/vespa/messagebus/network/rpcnetwork.h4
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.cpp4
-rw-r--r--messagebus/src/vespa/messagebus/protocolrepository.h4
-rw-r--r--messagebus/src/vespa/messagebus/sequencer.cpp6
-rw-r--r--messagebus/src/vespa/messagebus/sequencer.h6
8 files changed, 20 insertions, 22 deletions
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp
index c3d6b28b318..ce60f1a3969 100644
--- a/messagebus/src/vespa/messagebus/messagebus.cpp
+++ b/messagebus/src/vespa/messagebus/messagebus.cpp
@@ -13,7 +13,6 @@
#include <vespa/log/log.h>
LOG_SETUP(".messagebus");
-using vespalib::LockGuard;
using vespalib::make_string;
using namespace std::chrono_literals;
@@ -201,7 +200,7 @@ MessageBus::createIntermediateSession(const string &name,
IntermediateSession::UP
MessageBus::createIntermediateSession(const IntermediateSessionParams &params)
{
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
IntermediateSession::UP ret(new IntermediateSession(*this, params));
_sessions[params.getName()] = ret.get();
if (params.getBroadcastName()) {
@@ -224,7 +223,7 @@ MessageBus::createDestinationSession(const string &name,
DestinationSession::UP
MessageBus::createDestinationSession(const DestinationSessionParams &params)
{
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
DestinationSession::UP ret(new DestinationSession(*this, params));
_sessions[params.getName()] = ret.get();
if (params.getBroadcastName()) {
@@ -236,7 +235,7 @@ MessageBus::createDestinationSession(const DestinationSessionParams &params)
void
MessageBus::unregisterSession(const string &sessionName)
{
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
_network.unregisterSession(sessionName);
_sessions.erase(sessionName);
}
@@ -245,7 +244,7 @@ RoutingTable::SP
MessageBus::getRoutingTable(const string &protocol)
{
typedef std::map<string, RoutingTable::SP>::iterator ITR;
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
ITR itr = _routingTables.find(protocol);
if (itr == _routingTables.end()) {
return RoutingTable::SP(); // not found
@@ -293,7 +292,7 @@ MessageBus::setupRouting(const RoutingSpec &spec)
rtm[cfg.getProtocol()] = std::make_shared<RoutingTable>(cfg);
}
{
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
std::swap(_routingTables, rtm);
}
_protocolRepository->clearPolicyCache();
@@ -360,7 +359,7 @@ MessageBus::deliverMessage(Message::UP msg, const string &session)
{
IMessageHandler *msgHandler = nullptr;
{
- LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
std::map<string, IMessageHandler*>::iterator it = _sessions.find(session);
if (it != _sessions.end()) {
msgHandler = it->second;
diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h
index c0682967db9..b12054f7006 100644
--- a/messagebus/src/vespa/messagebus/messagebus.h
+++ b/messagebus/src/vespa/messagebus/messagebus.h
@@ -10,7 +10,6 @@
#include "sourcesession.h"
#include <vespa/messagebus/network/inetworkowner.h>
#include <vespa/messagebus/routing/routingspec.h>
-#include <vespa/vespalib/util/sync.h>
#include <map>
#include <string>
#include <atomic>
@@ -38,7 +37,7 @@ class MessageBus : public IMessageHandler,
private:
using RoutingTableSP = std::shared_ptr<RoutingTable>;
INetwork &_network;
- vespalib::Lock _lock;
+ std::mutex _lock;
std::map<string, RoutingTableSP> _routingTables;
std::map<string, IMessageHandler*> _sessions;
std::unique_ptr<ProtocolRepository> _protocolRepository;
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
index de3be2ffa01..eb94ab5ff5c 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp
@@ -96,7 +96,7 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version)
{
bool shouldSend = false;
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
if (version == nullptr) {
_hasError = true;
} else if (*version < _version) {
diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
index a8eb514387c..2780c3e8770 100644
--- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h
+++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h
@@ -37,7 +37,7 @@ class RPCNetwork : public INetwork,
private:
using CompressionConfig = vespalib::compression::CompressionConfig;
struct SendContext : public RPCTarget::IVersionHandler {
- vespalib::Lock _lock;
+ std::mutex _lock;
RPCNetwork &_net;
const Message &_msg;
uint32_t _traceLevel;
@@ -128,7 +128,7 @@ public:
*
* @param params A complete set of parameters.
*/
- RPCNetwork(const RPCNetworkParams &params);
+ explicit RPCNetwork(const RPCNetworkParams &params);
/**
* Destruct
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp
index 10ea4ecb25d..f23ef0e2ff7 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.cpp
+++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp
@@ -13,7 +13,7 @@ ProtocolRepository::~ProtocolRepository() = default;
void
ProtocolRepository::clearPolicyCache()
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
_routingPolicyCache.clear();
}
@@ -67,7 +67,7 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName,
{
string cacheKey = protocolName;
cacheKey.append('.').append(policyName).append(".").append(policyParam);
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
RoutingPolicyCache::iterator cit = _routingPolicyCache.find(cacheKey);
if (cit != _routingPolicyCache.end()) {
return cit->second;
diff --git a/messagebus/src/vespa/messagebus/protocolrepository.h b/messagebus/src/vespa/messagebus/protocolrepository.h
index b310ba3586a..28163149e2e 100644
--- a/messagebus/src/vespa/messagebus/protocolrepository.h
+++ b/messagebus/src/vespa/messagebus/protocolrepository.h
@@ -2,9 +2,9 @@
#pragma once
#include "iprotocol.h"
-#include <vespa/vespalib/util/sync.h>
#include <map>
#include <atomic>
+#include <mutex>
namespace mbus {
@@ -19,7 +19,7 @@ private:
using ProtocolMap = std::map<string, IProtocol::SP>;
using RoutingPolicyCache = std::map<string, IRoutingPolicy::SP>;
- vespalib::Lock _lock; // Only guards the cache,
+ std::mutex _lock; // Only guards the cache,
// not the protocols as they are set up during messagebus construction.
static constexpr size_t MAX_PROTOCOLS = 16;
std::pair<string, std::atomic<IProtocol *>> _protocols[MAX_PROTOCOLS];
diff --git a/messagebus/src/vespa/messagebus/sequencer.cpp b/messagebus/src/vespa/messagebus/sequencer.cpp
index e19dfa0ee61..6270284b9e2 100644
--- a/messagebus/src/vespa/messagebus/sequencer.cpp
+++ b/messagebus/src/vespa/messagebus/sequencer.cpp
@@ -38,7 +38,7 @@ Sequencer::filter(Message::UP msg)
uint64_t seqId = msg->getSequenceId();
msg->setContext(Context(seqId));
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
QueueMap::iterator it = _seqMap.find(seqId);
if (it != _seqMap.end()) {
if (it->second == nullptr) {
@@ -86,7 +86,7 @@ Sequencer::handleReply(Reply::UP reply)
make_string("Sequencer received reply with sequence id '%" PRIu64 "'.", seq));
Message::UP msg;
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard guard(_lock);
QueueMap::iterator it = _seqMap.find(seq);
MessageQueue *que = it->second;
assert(it != _seqMap.end());
@@ -100,7 +100,7 @@ Sequencer::handleReply(Reply::UP reply)
que->pop();
}
}
- if (msg.get() != nullptr) {
+ if (msg) {
sequencedSend(std::move(msg));
}
IReplyHandler &handler = reply->getCallStack().pop(*reply);
diff --git a/messagebus/src/vespa/messagebus/sequencer.h b/messagebus/src/vespa/messagebus/sequencer.h
index 7aae32fc744..54ac4ce5007 100644
--- a/messagebus/src/vespa/messagebus/sequencer.h
+++ b/messagebus/src/vespa/messagebus/sequencer.h
@@ -2,13 +2,13 @@
#pragma once
-#include <map>
-#include <vespa/vespalib/util/sync.h>
#include "imessagehandler.h"
#include "ireplyhandler.h"
#include "message.h"
#include "reply.h"
#include "queue.h"
+#include <mutex>
+#include <map>
namespace mbus {
@@ -21,7 +21,7 @@ class Sequencer : public IMessageHandler,
public IReplyHandler
{
private:
- vespalib::Lock _lock;
+ std::mutex _lock;
IMessageHandler &_sender;
typedef Queue<Message*> MessageQueue;