diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-13 22:13:14 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-13 22:56:14 +0000 |
commit | 001bdf0053ba9cb02e20afcceb9d0f7ed63f1178 (patch) | |
tree | b2b0d66c4459114d878cfa61b12e74c39bbb0b74 /messagebus | |
parent | 71c10939b19be8ea115cda9ecddcad7749b2c20d (diff) |
Use std::mutex and std:.condition_variable and GC some unused code.
Diffstat (limited to 'messagebus')
-rw-r--r-- | messagebus/src/vespa/messagebus/messenger.cpp | 33 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/messenger.h | 6 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpctarget.cpp | 10 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/network/rpctarget.h | 6 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/routablequeue.cpp | 13 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/routablequeue.h | 8 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/sourcesession.cpp | 30 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/sourcesession.h | 5 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/testlib/receptor.cpp | 18 | ||||
-rw-r--r-- | messagebus/src/vespa/messagebus/testlib/receptor.h | 14 |
10 files changed, 74 insertions, 69 deletions
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 4579f7dec0e..9623fa59acf 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "messenger.h" -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/gate.h> #include <vespa/log/log.h> @@ -157,22 +156,23 @@ public: namespace mbus { Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) : - _monitor(), - _pool(128000), - _children(), - _queue(), - _closed(false), - _skip_request_thread(skip_request_thread), - _skip_reply_thread(skip_reply_thread) + _lock(), + _pool(128000), + _children(), + _queue(), + _closed(false), + _skip_request_thread(skip_request_thread), + _skip_reply_thread(skip_reply_thread) {} Messenger::~Messenger() { { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); _closed = true; - guard.broadcast(); } + _cond.notify_all(); + _pool.Close(); std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>()); if ( ! _queue.empty()) { @@ -194,12 +194,12 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg) while (true) { ITask::UP task; { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); if (_closed) { break; } if (_queue.empty()) { - guard.wait(100); + _cond.wait_for(guard, 100ms); } if (!_queue.empty()) { task.reset(_queue.front()); @@ -237,7 +237,7 @@ Messenger::discardRecurrentTasks() bool Messenger::start() { - if (_pool.NewThread(this) == 0) { + if (_pool.NewThread(this) == nullptr) { return false; } return true; @@ -266,11 +266,12 @@ Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) void Messenger::enqueue(ITask::UP task) { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); if (!_closed) { _queue.push(task.release()); if (_queue.size() == 1) { - guard.signal(); + guard.unlock(); + _cond.notify_one(); } } } @@ -286,7 +287,7 @@ Messenger::sync() bool Messenger::isEmpty() const { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); return _queue.empty(); } diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h index 3103e9afae1..7ca3749b970 100644 --- a/messagebus/src/vespa/messagebus/messenger.h +++ b/messagebus/src/vespa/messagebus/messenger.h @@ -6,7 +6,6 @@ #include "message.h" #include "reply.h" #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/fastos/thread.h> @@ -40,8 +39,9 @@ public: }; private: - vespalib::Monitor _monitor; - FastOS_ThreadPool _pool; + mutable std::mutex _lock; + std::condition_variable _cond; + FastOS_ThreadPool _pool; std::vector<ITask*> _children; vespalib::ArrayQueue<ITask*> _queue; bool _closed; diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp index ea21010e21c..b91ba43f036 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp +++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp @@ -29,11 +29,11 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler) ResolveState state = _state.load(std::memory_order_acquire); bool hasVersion = (state == VERSION_RESOLVED); if ( ! hasVersion ) { - vespalib::MonitorGuard guard(_lock); + std::unique_lock guard(_lock); state = _state.load(std::memory_order_relaxed); if (state == VERSION_RESOLVED || state == PROCESSING_HANDLERS) { while (_state.load(std::memory_order::memory_order_relaxed) == PROCESSING_HANDLERS) { - guard.wait(); + _cond.wait(guard); } hasVersion = true; } else { @@ -71,7 +71,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) { HandlerList handlers; { - vespalib::MonitorGuard guard(_lock); + std::lock_guard guard(_lock); assert(_state == TARGET_INVOKED); if (req->CheckReturnTypes("s")) { FRT_Values &val = *req->GetReturn(); @@ -90,10 +90,10 @@ RPCTarget::RequestDone(FRT_RPCRequest *req) handler->handleVersion(_version.get()); } { - vespalib::MonitorGuard guard(_lock); + std::lock_guard guard(_lock); _state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED); - guard.broadcast(); } + _cond.notify_all(); req->SubRef(); } diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h index d927292f26d..6a57bd983e7 100644 --- a/messagebus/src/vespa/messagebus/network/rpctarget.h +++ b/messagebus/src/vespa/messagebus/network/rpctarget.h @@ -5,7 +5,6 @@ #include <vespa/fnet/frt/invoker.h> #include <vespa/fnet/frt/target.h> #include <vespa/vespalib/component/version.h> -#include <vespa/vespalib/util/sync.h> namespace mbus { @@ -27,7 +26,7 @@ public: /** * Virtual destructor required for inheritance. */ - virtual ~IVersionHandler() { } + virtual ~IVersionHandler() = default; /** * This method is invoked once the version of the corresponding {@link @@ -50,7 +49,8 @@ private: }; typedef std::unique_ptr<vespalib::Version> Version_UP; - vespalib::Monitor _lock; + std::mutex _lock; + std::condition_variable _cond; FRT_Supervisor &_orb; string _name; FRT_Target &_target; diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp index 494ee11c7cc..9ec6d36f688 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.cpp +++ b/messagebus/src/vespa/messagebus/routablequeue.cpp @@ -7,7 +7,7 @@ using namespace std::chrono; namespace mbus { RoutableQueue::RoutableQueue() - : _monitor(), + : _lock(), _queue() { } @@ -23,18 +23,19 @@ RoutableQueue::~RoutableQueue() uint32_t RoutableQueue::size() { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); return _queue.size(); } void RoutableQueue::enqueue(Routable::UP r) { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); _queue.push(r.get()); r.release(); if (_queue.size() == 1) { - guard.broadcast(); // support multiple readers + guard.unlock(); + _cond.notify_all(); // support multiple readers } } @@ -43,9 +44,9 @@ RoutableQueue::dequeue(duration timeout) { steady_clock::time_point startTime = steady_clock::now(); duration left = timeout; - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); while (_queue.size() == 0 && left > duration::zero()) { - if (!guard.wait(left) || _queue.size() > 0) { + if ((_cond.wait_for(guard, left) == std::cv_status::no_timeout) || (_queue.size() > 0)) { break; } duration elapsed = (steady_clock::now() - startTime); diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h index 03f81a7a8d3..0c3edc1a597 100644 --- a/messagebus/src/vespa/messagebus/routablequeue.h +++ b/messagebus/src/vespa/messagebus/routablequeue.h @@ -2,13 +2,14 @@ #pragma once -#include <vespa/vespalib/util/sync.h> #include "imessagehandler.h" #include "ireplyhandler.h" #include "queue.h" #include "routable.h" #include "message.h" #include "reply.h" +#include <mutex> +#include <condition_variable> namespace mbus { @@ -25,8 +26,9 @@ class RoutableQueue : public IMessageHandler, public IReplyHandler { private: - vespalib::Monitor _monitor; - Queue<Routable*> _queue; + std::mutex _lock; + std::condition_variable _cond; + Queue<Routable*> _queue; public: RoutableQueue(const RoutableQueue &) = delete; diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp index eece44a922a..b22f684b848 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.cpp +++ b/messagebus/src/vespa/messagebus/sourcesession.cpp @@ -13,7 +13,7 @@ using vespalib::make_string; namespace mbus { SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams ¶ms) - : _monitor(), + : _lock(), _mbus(mbus), _gate(new ReplyGate(_mbus)), _sequencer(*_gate), @@ -76,17 +76,17 @@ SourceSession::send(Message::UP msg) msg->setTimeRemaining(_timeout); } { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); if (_closed) { return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg)); } - if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) { + if (_throttlePolicy && !_throttlePolicy->canSend(*msg, _pendingCount)) { return Result(Error(ErrorCode::SEND_QUEUE_FULL, make_string("Too much pending data (%d messages).", _pendingCount)), std::move(msg)); } msg->pushHandler(_replyHandler); - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy) { _throttlePolicy->processMessage(*msg); } ++_pendingCount; @@ -106,10 +106,10 @@ SourceSession::handleReply(Reply::UP reply) { bool done; { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); assert(_pendingCount > 0); --_pendingCount; - if (_throttlePolicy.get() != nullptr) { + if (_throttlePolicy) { _throttlePolicy->processReply(*reply); } done = (_closed && _pendingCount == 0); @@ -121,31 +121,33 @@ SourceSession::handleReply(Reply::UP reply) IReplyHandler &handler = reply->getCallStack().pop(*reply); handler.handleReply(std::move(reply)); if (done) { - vespalib::MonitorGuard guard(_monitor); - assert(_pendingCount == 0); - assert(_closed); - _done = true; - guard.broadcast(); + { + std::lock_guard guard(_lock); + assert(_pendingCount == 0); + assert(_closed); + _done = true; + } + _cond.notify_all(); } } void SourceSession::close() { - vespalib::MonitorGuard guard(_monitor); + std::unique_lock guard(_lock); _closed = true; if (_pendingCount == 0) { _done = true; } while (!_done) { - guard.wait(); + _cond.wait(guard); } } SourceSession & SourceSession::setTimeout(duration timeout) { - vespalib::MonitorGuard guard(_monitor); + std::lock_guard guard(_lock); _timeout = timeout; return *this; } diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h index 0992a3e377b..c7dfcdf9337 100644 --- a/messagebus/src/vespa/messagebus/sourcesession.h +++ b/messagebus/src/vespa/messagebus/sourcesession.h @@ -1,11 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once -#include <vespa/vespalib/util/sync.h> #include "ireplyhandler.h" #include "result.h" #include "sequencer.h" #include "sourcesessionparams.h" +#include <condition_variable> namespace mbus { @@ -21,7 +21,8 @@ class SourceSession : public IReplyHandler { private: friend class MessageBus; - vespalib::Monitor _monitor; + std::mutex _lock; + std::condition_variable _cond; MessageBus &_mbus; ReplyGate *_gate; Sequencer _sequencer; diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp index 01d644bba09..a8199938a25 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp +++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp @@ -12,27 +12,27 @@ Receptor::~Receptor() = default; void Receptor::handleMessage(Message::UP msg) { - vespalib::MonitorGuard guard(_mon); + std::lock_guard guard(_mon); _msg = std::move(msg); - guard.broadcast(); + _cond.notify_all(); } void Receptor::handleReply(Reply::UP reply) { - vespalib::MonitorGuard guard(_mon); + std::lock_guard guard(_mon); _reply = std::move(reply); - guard.broadcast(); + _cond.notify_all(); } Message::UP Receptor::getMessage(duration maxWait) { steady_clock::time_point startTime = steady_clock::now(); - vespalib::MonitorGuard guard(_mon); - while (_msg.get() == 0) { + std::unique_lock guard(_mon); + while ( ! _msg) { duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); - if (w <= duration::zero() || !guard.wait(w)) { + if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) { break; } } @@ -43,10 +43,10 @@ Reply::UP Receptor::getReply(duration maxWait) { steady_clock::time_point startTime = steady_clock::now(); - vespalib::MonitorGuard guard(_mon); + std::unique_lock guard(_mon); while (_reply.get() == 0) { duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime); - if (w <= duration::zero() || !guard.wait(w)) { + if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) { break; } } diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h index 1d98ac62cd2..4e734319ca0 100644 --- a/messagebus/src/vespa/messagebus/testlib/receptor.h +++ b/messagebus/src/vespa/messagebus/testlib/receptor.h @@ -6,7 +6,7 @@ #include <vespa/messagebus/ireplyhandler.h> #include <vespa/messagebus/message.h> #include <vespa/messagebus/reply.h> -#include <vespa/vespalib/util/sync.h> +#include <condition_variable> namespace mbus { @@ -14,15 +14,13 @@ class Receptor : public IMessageHandler, public IReplyHandler { private: - vespalib::Monitor _mon; - Message::UP _msg; - Reply::UP _reply; - - Receptor(const Receptor &); - Receptor &operator=(const Receptor &); + std::mutex _mon; + std::condition_variable _cond; + Message::UP _msg; + Reply::UP _reply; public: Receptor(); - ~Receptor(); + ~Receptor() override; void handleMessage(Message::UP msg) override; void handleReply(Reply::UP reply) override; Message::UP getMessage(duration maxWait = 120s); |