diff options
Diffstat (limited to 'messagebus')
9 files changed, 23 insertions, 22 deletions
diff --git a/messagebus/src/vespa/messagebus/messagebus.cpp b/messagebus/src/vespa/messagebus/messagebus.cpp index c3d6b28b318..ce60f1a3969 100644 --- a/messagebus/src/vespa/messagebus/messagebus.cpp +++ b/messagebus/src/vespa/messagebus/messagebus.cpp @@ -13,7 +13,6 @@ #include <vespa/log/log.h> LOG_SETUP(".messagebus"); -using vespalib::LockGuard; using vespalib::make_string; using namespace std::chrono_literals; @@ -201,7 +200,7 @@ MessageBus::createIntermediateSession(const string &name, IntermediateSession::UP MessageBus::createIntermediateSession(const IntermediateSessionParams ¶ms) { - LockGuard guard(_lock); + std::lock_guard guard(_lock); IntermediateSession::UP ret(new IntermediateSession(*this, params)); _sessions[params.getName()] = ret.get(); if (params.getBroadcastName()) { @@ -224,7 +223,7 @@ MessageBus::createDestinationSession(const string &name, DestinationSession::UP MessageBus::createDestinationSession(const DestinationSessionParams ¶ms) { - LockGuard guard(_lock); + std::lock_guard guard(_lock); DestinationSession::UP ret(new DestinationSession(*this, params)); _sessions[params.getName()] = ret.get(); if (params.getBroadcastName()) { @@ -236,7 +235,7 @@ MessageBus::createDestinationSession(const DestinationSessionParams ¶ms) void MessageBus::unregisterSession(const string &sessionName) { - LockGuard guard(_lock); + std::lock_guard guard(_lock); _network.unregisterSession(sessionName); _sessions.erase(sessionName); } @@ -245,7 +244,7 @@ RoutingTable::SP MessageBus::getRoutingTable(const string &protocol) { typedef std::map<string, RoutingTable::SP>::iterator ITR; - LockGuard guard(_lock); + std::lock_guard guard(_lock); ITR itr = _routingTables.find(protocol); if (itr == _routingTables.end()) { return RoutingTable::SP(); // not found @@ -293,7 +292,7 @@ MessageBus::setupRouting(const RoutingSpec &spec) rtm[cfg.getProtocol()] = std::make_shared<RoutingTable>(cfg); } { - LockGuard guard(_lock); + std::lock_guard guard(_lock); std::swap(_routingTables, rtm); } _protocolRepository->clearPolicyCache(); @@ -360,7 +359,7 @@ MessageBus::deliverMessage(Message::UP msg, const string &session) { IMessageHandler *msgHandler = nullptr; { - LockGuard guard(_lock); + std::lock_guard guard(_lock); std::map<string, IMessageHandler*>::iterator it = _sessions.find(session); if (it != _sessions.end()) { msgHandler = it->second; diff --git a/messagebus/src/vespa/messagebus/messagebus.h b/messagebus/src/vespa/messagebus/messagebus.h index c0682967db9..b12054f7006 100644 --- a/messagebus/src/vespa/messagebus/messagebus.h +++ b/messagebus/src/vespa/messagebus/messagebus.h @@ -10,7 +10,6 @@ #include "sourcesession.h" #include <vespa/messagebus/network/inetworkowner.h> #include <vespa/messagebus/routing/routingspec.h> -#include <vespa/vespalib/util/sync.h> #include <map> #include <string> #include <atomic> @@ -38,7 +37,7 @@ class MessageBus : public IMessageHandler, private: using RoutingTableSP = std::shared_ptr<RoutingTable>; INetwork &_network; - vespalib::Lock _lock; + std::mutex _lock; std::map<string, RoutingTableSP> _routingTables; std::map<string, IMessageHandler*> _sessions; std::unique_ptr<ProtocolRepository> _protocolRepository; diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index de3be2ffa01..eb94ab5ff5c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -96,7 +96,7 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) { bool shouldSend = false; { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); if (version == nullptr) { _hasError = true; } else if (*version < _version) { diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a8eb514387c..2780c3e8770 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -37,7 +37,7 @@ class RPCNetwork : public INetwork, private: using CompressionConfig = vespalib::compression::CompressionConfig; struct SendContext : public RPCTarget::IVersionHandler { - vespalib::Lock _lock; + std::mutex _lock; RPCNetwork &_net; const Message &_msg; uint32_t _traceLevel; @@ -128,7 +128,7 @@ public: * * @param params A complete set of parameters. */ - RPCNetwork(const RPCNetworkParams ¶ms); + explicit RPCNetwork(const RPCNetworkParams ¶ms); /** * Destruct diff --git a/messagebus/src/vespa/messagebus/protocolrepository.cpp b/messagebus/src/vespa/messagebus/protocolrepository.cpp index a9891069c44..f23ef0e2ff7 100644 --- a/messagebus/src/vespa/messagebus/protocolrepository.cpp +++ b/messagebus/src/vespa/messagebus/protocolrepository.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "protocolrepository.h" +#include <cassert> #include <vespa/log/log.h> LOG_SETUP(".protocolrepository"); @@ -12,7 +13,7 @@ ProtocolRepository::~ProtocolRepository() = default; void ProtocolRepository::clearPolicyCache() { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); _routingPolicyCache.clear(); } @@ -66,7 +67,7 @@ ProtocolRepository::getRoutingPolicy(const string &protocolName, { string cacheKey = protocolName; cacheKey.append('.').append(policyName).append(".").append(policyParam); - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); RoutingPolicyCache::iterator cit = _routingPolicyCache.find(cacheKey); if (cit != _routingPolicyCache.end()) { return cit->second; diff --git a/messagebus/src/vespa/messagebus/protocolrepository.h b/messagebus/src/vespa/messagebus/protocolrepository.h index b310ba3586a..28163149e2e 100644 --- a/messagebus/src/vespa/messagebus/protocolrepository.h +++ b/messagebus/src/vespa/messagebus/protocolrepository.h @@ -2,9 +2,9 @@ #pragma once #include "iprotocol.h" -#include <vespa/vespalib/util/sync.h> #include <map> #include <atomic> +#include <mutex> namespace mbus { @@ -19,7 +19,7 @@ private: using ProtocolMap = std::map<string, IProtocol::SP>; using RoutingPolicyCache = std::map<string, IRoutingPolicy::SP>; - vespalib::Lock _lock; // Only guards the cache, + std::mutex _lock; // Only guards the cache, // not the protocols as they are set up during messagebus construction. static constexpr size_t MAX_PROTOCOLS = 16; std::pair<string, std::atomic<IProtocol *>> _protocols[MAX_PROTOCOLS]; diff --git a/messagebus/src/vespa/messagebus/routing/routingnode.cpp b/messagebus/src/vespa/messagebus/routing/routingnode.cpp index a47abe185cc..f24afbc07ca 100644 --- a/messagebus/src/vespa/messagebus/routing/routingnode.cpp +++ b/messagebus/src/vespa/messagebus/routing/routingnode.cpp @@ -10,6 +10,7 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/messagebus/network/inetwork.h> #include <stack> +#include <cassert> using vespalib::make_string; diff --git a/messagebus/src/vespa/messagebus/sequencer.cpp b/messagebus/src/vespa/messagebus/sequencer.cpp index 2d7a36a28ef..6270284b9e2 100644 --- a/messagebus/src/vespa/messagebus/sequencer.cpp +++ b/messagebus/src/vespa/messagebus/sequencer.cpp @@ -2,6 +2,7 @@ #include "sequencer.h" #include "tracelevel.h" #include <vespa/vespalib/util/stringfmt.h> +#include <cassert> using vespalib::make_string; @@ -37,7 +38,7 @@ Sequencer::filter(Message::UP msg) uint64_t seqId = msg->getSequenceId(); msg->setContext(Context(seqId)); { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); QueueMap::iterator it = _seqMap.find(seqId); if (it != _seqMap.end()) { if (it->second == nullptr) { @@ -85,7 +86,7 @@ Sequencer::handleReply(Reply::UP reply) make_string("Sequencer received reply with sequence id '%" PRIu64 "'.", seq)); Message::UP msg; { - vespalib::LockGuard guard(_lock); + std::lock_guard guard(_lock); QueueMap::iterator it = _seqMap.find(seq); MessageQueue *que = it->second; assert(it != _seqMap.end()); @@ -99,7 +100,7 @@ Sequencer::handleReply(Reply::UP reply) que->pop(); } } - if (msg.get() != nullptr) { + if (msg) { sequencedSend(std::move(msg)); } IReplyHandler &handler = reply->getCallStack().pop(*reply); diff --git a/messagebus/src/vespa/messagebus/sequencer.h b/messagebus/src/vespa/messagebus/sequencer.h index 7aae32fc744..54ac4ce5007 100644 --- a/messagebus/src/vespa/messagebus/sequencer.h +++ b/messagebus/src/vespa/messagebus/sequencer.h @@ -2,13 +2,13 @@ #pragma once -#include <map> -#include <vespa/vespalib/util/sync.h> #include "imessagehandler.h" #include "ireplyhandler.h" #include "message.h" #include "reply.h" #include "queue.h" +#include <mutex> +#include <map> namespace mbus { @@ -21,7 +21,7 @@ class Sequencer : public IMessageHandler, public IReplyHandler { private: - vespalib::Lock _lock; + std::mutex _lock; IMessageHandler &_sender; typedef Queue<Message*> MessageQueue; |