summaryrefslogtreecommitdiffstats
path: root/messagebus
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 22:13:14 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-13 22:56:14 +0000
commit001bdf0053ba9cb02e20afcceb9d0f7ed63f1178 (patch)
treeb2b0d66c4459114d878cfa61b12e74c39bbb0b74 /messagebus
parent71c10939b19be8ea115cda9ecddcad7749b2c20d (diff)
Use std::mutex and std:.condition_variable and GC some unused code.
Diffstat (limited to 'messagebus')
-rw-r--r--messagebus/src/vespa/messagebus/messenger.cpp33
-rw-r--r--messagebus/src/vespa/messagebus/messenger.h6
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.cpp10
-rw-r--r--messagebus/src/vespa/messagebus/network/rpctarget.h6
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.cpp13
-rw-r--r--messagebus/src/vespa/messagebus/routablequeue.h8
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.cpp30
-rw-r--r--messagebus/src/vespa/messagebus/sourcesession.h5
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.cpp18
-rw-r--r--messagebus/src/vespa/messagebus/testlib/receptor.h14
10 files changed, 74 insertions, 69 deletions
diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp
index 4579f7dec0e..9623fa59acf 100644
--- a/messagebus/src/vespa/messagebus/messenger.cpp
+++ b/messagebus/src/vespa/messagebus/messenger.cpp
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "messenger.h"
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/gate.h>
#include <vespa/log/log.h>
@@ -157,22 +156,23 @@ public:
namespace mbus {
Messenger::Messenger(bool skip_request_thread, bool skip_reply_thread) :
- _monitor(),
- _pool(128000),
- _children(),
- _queue(),
- _closed(false),
- _skip_request_thread(skip_request_thread),
- _skip_reply_thread(skip_reply_thread)
+ _lock(),
+ _pool(128000),
+ _children(),
+ _queue(),
+ _closed(false),
+ _skip_request_thread(skip_request_thread),
+ _skip_reply_thread(skip_reply_thread)
{}
Messenger::~Messenger()
{
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
_closed = true;
- guard.broadcast();
}
+ _cond.notify_all();
+
_pool.Close();
std::for_each(_children.begin(), _children.end(), DeleteFunctor<ITask>());
if ( ! _queue.empty()) {
@@ -194,12 +194,12 @@ Messenger::Run(FastOS_ThreadInterface *thread, void *arg)
while (true) {
ITask::UP task;
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
if (_closed) {
break;
}
if (_queue.empty()) {
- guard.wait(100);
+ _cond.wait_for(guard, 100ms);
}
if (!_queue.empty()) {
task.reset(_queue.front());
@@ -237,7 +237,7 @@ Messenger::discardRecurrentTasks()
bool
Messenger::start()
{
- if (_pool.NewThread(this) == 0) {
+ if (_pool.NewThread(this) == nullptr) {
return false;
}
return true;
@@ -266,11 +266,12 @@ Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler)
void
Messenger::enqueue(ITask::UP task)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
if (!_closed) {
_queue.push(task.release());
if (_queue.size() == 1) {
- guard.signal();
+ guard.unlock();
+ _cond.notify_one();
}
}
}
@@ -286,7 +287,7 @@ Messenger::sync()
bool
Messenger::isEmpty() const
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
return _queue.empty();
}
diff --git a/messagebus/src/vespa/messagebus/messenger.h b/messagebus/src/vespa/messagebus/messenger.h
index 3103e9afae1..7ca3749b970 100644
--- a/messagebus/src/vespa/messagebus/messenger.h
+++ b/messagebus/src/vespa/messagebus/messenger.h
@@ -6,7 +6,6 @@
#include "message.h"
#include "reply.h"
#include <vespa/vespalib/util/executor.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/fastos/thread.h>
@@ -40,8 +39,9 @@ public:
};
private:
- vespalib::Monitor _monitor;
- FastOS_ThreadPool _pool;
+ mutable std::mutex _lock;
+ std::condition_variable _cond;
+ FastOS_ThreadPool _pool;
std::vector<ITask*> _children;
vespalib::ArrayQueue<ITask*> _queue;
bool _closed;
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.cpp b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
index ea21010e21c..b91ba43f036 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.cpp
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.cpp
@@ -29,11 +29,11 @@ RPCTarget::resolveVersion(duration timeout, RPCTarget::IVersionHandler &handler)
ResolveState state = _state.load(std::memory_order_acquire);
bool hasVersion = (state == VERSION_RESOLVED);
if ( ! hasVersion ) {
- vespalib::MonitorGuard guard(_lock);
+ std::unique_lock guard(_lock);
state = _state.load(std::memory_order_relaxed);
if (state == VERSION_RESOLVED || state == PROCESSING_HANDLERS) {
while (_state.load(std::memory_order::memory_order_relaxed) == PROCESSING_HANDLERS) {
- guard.wait();
+ _cond.wait(guard);
}
hasVersion = true;
} else {
@@ -71,7 +71,7 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
{
HandlerList handlers;
{
- vespalib::MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
assert(_state == TARGET_INVOKED);
if (req->CheckReturnTypes("s")) {
FRT_Values &val = *req->GetReturn();
@@ -90,10 +90,10 @@ RPCTarget::RequestDone(FRT_RPCRequest *req)
handler->handleVersion(_version.get());
}
{
- vespalib::MonitorGuard guard(_lock);
+ std::lock_guard guard(_lock);
_state = (_version.get() ? VERSION_RESOLVED : VERSION_NOT_RESOLVED);
- guard.broadcast();
}
+ _cond.notify_all();
req->SubRef();
}
diff --git a/messagebus/src/vespa/messagebus/network/rpctarget.h b/messagebus/src/vespa/messagebus/network/rpctarget.h
index d927292f26d..6a57bd983e7 100644
--- a/messagebus/src/vespa/messagebus/network/rpctarget.h
+++ b/messagebus/src/vespa/messagebus/network/rpctarget.h
@@ -5,7 +5,6 @@
#include <vespa/fnet/frt/invoker.h>
#include <vespa/fnet/frt/target.h>
#include <vespa/vespalib/component/version.h>
-#include <vespa/vespalib/util/sync.h>
namespace mbus {
@@ -27,7 +26,7 @@ public:
/**
* Virtual destructor required for inheritance.
*/
- virtual ~IVersionHandler() { }
+ virtual ~IVersionHandler() = default;
/**
* This method is invoked once the version of the corresponding {@link
@@ -50,7 +49,8 @@ private:
};
typedef std::unique_ptr<vespalib::Version> Version_UP;
- vespalib::Monitor _lock;
+ std::mutex _lock;
+ std::condition_variable _cond;
FRT_Supervisor &_orb;
string _name;
FRT_Target &_target;
diff --git a/messagebus/src/vespa/messagebus/routablequeue.cpp b/messagebus/src/vespa/messagebus/routablequeue.cpp
index 494ee11c7cc..9ec6d36f688 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.cpp
+++ b/messagebus/src/vespa/messagebus/routablequeue.cpp
@@ -7,7 +7,7 @@ using namespace std::chrono;
namespace mbus {
RoutableQueue::RoutableQueue()
- : _monitor(),
+ : _lock(),
_queue()
{ }
@@ -23,18 +23,19 @@ RoutableQueue::~RoutableQueue()
uint32_t
RoutableQueue::size()
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
return _queue.size();
}
void
RoutableQueue::enqueue(Routable::UP r)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
_queue.push(r.get());
r.release();
if (_queue.size() == 1) {
- guard.broadcast(); // support multiple readers
+ guard.unlock();
+ _cond.notify_all(); // support multiple readers
}
}
@@ -43,9 +44,9 @@ RoutableQueue::dequeue(duration timeout)
{
steady_clock::time_point startTime = steady_clock::now();
duration left = timeout;
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
while (_queue.size() == 0 && left > duration::zero()) {
- if (!guard.wait(left) || _queue.size() > 0) {
+ if ((_cond.wait_for(guard, left) == std::cv_status::no_timeout) || (_queue.size() > 0)) {
break;
}
duration elapsed = (steady_clock::now() - startTime);
diff --git a/messagebus/src/vespa/messagebus/routablequeue.h b/messagebus/src/vespa/messagebus/routablequeue.h
index 03f81a7a8d3..0c3edc1a597 100644
--- a/messagebus/src/vespa/messagebus/routablequeue.h
+++ b/messagebus/src/vespa/messagebus/routablequeue.h
@@ -2,13 +2,14 @@
#pragma once
-#include <vespa/vespalib/util/sync.h>
#include "imessagehandler.h"
#include "ireplyhandler.h"
#include "queue.h"
#include "routable.h"
#include "message.h"
#include "reply.h"
+#include <mutex>
+#include <condition_variable>
namespace mbus {
@@ -25,8 +26,9 @@ class RoutableQueue : public IMessageHandler,
public IReplyHandler
{
private:
- vespalib::Monitor _monitor;
- Queue<Routable*> _queue;
+ std::mutex _lock;
+ std::condition_variable _cond;
+ Queue<Routable*> _queue;
public:
RoutableQueue(const RoutableQueue &) = delete;
diff --git a/messagebus/src/vespa/messagebus/sourcesession.cpp b/messagebus/src/vespa/messagebus/sourcesession.cpp
index eece44a922a..b22f684b848 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.cpp
+++ b/messagebus/src/vespa/messagebus/sourcesession.cpp
@@ -13,7 +13,7 @@ using vespalib::make_string;
namespace mbus {
SourceSession::SourceSession(MessageBus &mbus, const SourceSessionParams &params)
- : _monitor(),
+ : _lock(),
_mbus(mbus),
_gate(new ReplyGate(_mbus)),
_sequencer(*_gate),
@@ -76,17 +76,17 @@ SourceSession::send(Message::UP msg)
msg->setTimeRemaining(_timeout);
}
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
if (_closed) {
return Result(Error(ErrorCode::SEND_QUEUE_CLOSED, "Source session is closed."), std::move(msg));
}
- if (_throttlePolicy.get() != nullptr && !_throttlePolicy->canSend(*msg, _pendingCount)) {
+ if (_throttlePolicy && !_throttlePolicy->canSend(*msg, _pendingCount)) {
return Result(Error(ErrorCode::SEND_QUEUE_FULL,
make_string("Too much pending data (%d messages).", _pendingCount)),
std::move(msg));
}
msg->pushHandler(_replyHandler);
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy) {
_throttlePolicy->processMessage(*msg);
}
++_pendingCount;
@@ -106,10 +106,10 @@ SourceSession::handleReply(Reply::UP reply)
{
bool done;
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
assert(_pendingCount > 0);
--_pendingCount;
- if (_throttlePolicy.get() != nullptr) {
+ if (_throttlePolicy) {
_throttlePolicy->processReply(*reply);
}
done = (_closed && _pendingCount == 0);
@@ -121,31 +121,33 @@ SourceSession::handleReply(Reply::UP reply)
IReplyHandler &handler = reply->getCallStack().pop(*reply);
handler.handleReply(std::move(reply));
if (done) {
- vespalib::MonitorGuard guard(_monitor);
- assert(_pendingCount == 0);
- assert(_closed);
- _done = true;
- guard.broadcast();
+ {
+ std::lock_guard guard(_lock);
+ assert(_pendingCount == 0);
+ assert(_closed);
+ _done = true;
+ }
+ _cond.notify_all();
}
}
void
SourceSession::close()
{
- vespalib::MonitorGuard guard(_monitor);
+ std::unique_lock guard(_lock);
_closed = true;
if (_pendingCount == 0) {
_done = true;
}
while (!_done) {
- guard.wait();
+ _cond.wait(guard);
}
}
SourceSession &
SourceSession::setTimeout(duration timeout)
{
- vespalib::MonitorGuard guard(_monitor);
+ std::lock_guard guard(_lock);
_timeout = timeout;
return *this;
}
diff --git a/messagebus/src/vespa/messagebus/sourcesession.h b/messagebus/src/vespa/messagebus/sourcesession.h
index 0992a3e377b..c7dfcdf9337 100644
--- a/messagebus/src/vespa/messagebus/sourcesession.h
+++ b/messagebus/src/vespa/messagebus/sourcesession.h
@@ -1,11 +1,11 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/vespalib/util/sync.h>
#include "ireplyhandler.h"
#include "result.h"
#include "sequencer.h"
#include "sourcesessionparams.h"
+#include <condition_variable>
namespace mbus {
@@ -21,7 +21,8 @@ class SourceSession : public IReplyHandler {
private:
friend class MessageBus;
- vespalib::Monitor _monitor;
+ std::mutex _lock;
+ std::condition_variable _cond;
MessageBus &_mbus;
ReplyGate *_gate;
Sequencer _sequencer;
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.cpp b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
index 01d644bba09..a8199938a25 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.cpp
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.cpp
@@ -12,27 +12,27 @@ Receptor::~Receptor() = default;
void
Receptor::handleMessage(Message::UP msg)
{
- vespalib::MonitorGuard guard(_mon);
+ std::lock_guard guard(_mon);
_msg = std::move(msg);
- guard.broadcast();
+ _cond.notify_all();
}
void
Receptor::handleReply(Reply::UP reply)
{
- vespalib::MonitorGuard guard(_mon);
+ std::lock_guard guard(_mon);
_reply = std::move(reply);
- guard.broadcast();
+ _cond.notify_all();
}
Message::UP
Receptor::getMessage(duration maxWait)
{
steady_clock::time_point startTime = steady_clock::now();
- vespalib::MonitorGuard guard(_mon);
- while (_msg.get() == 0) {
+ std::unique_lock guard(_mon);
+ while ( ! _msg) {
duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
- if (w <= duration::zero() || !guard.wait(w)) {
+ if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) {
break;
}
}
@@ -43,10 +43,10 @@ Reply::UP
Receptor::getReply(duration maxWait)
{
steady_clock::time_point startTime = steady_clock::now();
- vespalib::MonitorGuard guard(_mon);
+ std::unique_lock guard(_mon);
while (_reply.get() == 0) {
duration w = maxWait - duration_cast<milliseconds>(steady_clock::now() - startTime);
- if (w <= duration::zero() || !guard.wait(w)) {
+ if (w <= duration::zero() || (_cond.wait_for(guard, w) == std::cv_status::timeout)) {
break;
}
}
diff --git a/messagebus/src/vespa/messagebus/testlib/receptor.h b/messagebus/src/vespa/messagebus/testlib/receptor.h
index 1d98ac62cd2..4e734319ca0 100644
--- a/messagebus/src/vespa/messagebus/testlib/receptor.h
+++ b/messagebus/src/vespa/messagebus/testlib/receptor.h
@@ -6,7 +6,7 @@
#include <vespa/messagebus/ireplyhandler.h>
#include <vespa/messagebus/message.h>
#include <vespa/messagebus/reply.h>
-#include <vespa/vespalib/util/sync.h>
+#include <condition_variable>
namespace mbus {
@@ -14,15 +14,13 @@ class Receptor : public IMessageHandler,
public IReplyHandler
{
private:
- vespalib::Monitor _mon;
- Message::UP _msg;
- Reply::UP _reply;
-
- Receptor(const Receptor &);
- Receptor &operator=(const Receptor &);
+ std::mutex _mon;
+ std::condition_variable _cond;
+ Message::UP _msg;
+ Reply::UP _reply;
public:
Receptor();
- ~Receptor();
+ ~Receptor() override;
void handleMessage(Message::UP msg) override;
void handleReply(Reply::UP reply) override;
Message::UP getMessage(duration maxWait = 120s);