aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 10:06:37 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-15 10:06:37 +0000
commit88232fd29c093e6609bb6cbd6df1e0dd1f1d69f5 (patch)
treefad57deca1f1205933dfd8ccd645dabae91ff457 /vespalib
parentc6b7ba31a5b4f1df51c3454985d5b3833f42635c (diff)
- LockGuard -> unique_lock
- Pass cond to avoid having protected members. - monitor -> lock
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp44
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h23
6 files changed, 44 insertions, 47 deletions
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
index 2701eaab2b5..45e64773ea7 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp
@@ -7,18 +7,18 @@ namespace vespalib {
VESPA_THREAD_STACK_TAG(unnamed_blocking_executor);
bool
-BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard)
+BlockingThreadStackExecutor::acceptNewTask(unique_lock & guard, std::condition_variable & cond)
{
while (!closed() && !isRoomForNewTask() && !owns_this_thread()) {
- _cond.wait(guard);
+ cond.wait(guard);
}
return (!closed());
}
void
-BlockingThreadStackExecutor::wakeup(MonitorGuard &)
+BlockingThreadStackExecutor::wakeup(unique_lock &, std::condition_variable & cond)
{
- _cond.notify_all();
+ cond.notify_all();
}
BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit)
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
index df3f9408e0a..63e25d1554b 100644
--- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h
@@ -12,8 +12,8 @@ namespace vespalib {
class BlockingThreadStackExecutor : public ThreadStackExecutorBase
{
private:
- bool acceptNewTask(MonitorGuard & monitor) override;
- void wakeup(MonitorGuard & monitor) override;
+ bool acceptNewTask(unique_lock & guard, std::condition_variable & cond) override;
+ void wakeup(unique_lock & guard, std::condition_variable &) override;
public:
/**
@@ -32,7 +32,7 @@ public:
BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit,
init_fun_t init_function);
- ~BlockingThreadStackExecutor();
+ ~BlockingThreadStackExecutor() override;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
index 6d45a78b331..bd76dcd0219 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp
@@ -7,13 +7,13 @@ namespace vespalib {
VESPA_THREAD_STACK_TAG(unnamed_nonblocking_executor);
bool
-ThreadStackExecutor::acceptNewTask(MonitorGuard &)
+ThreadStackExecutor::acceptNewTask(unique_lock &, std::condition_variable &)
{
return isRoomForNewTask();
}
void
-ThreadStackExecutor::wakeup(MonitorGuard &)
+ThreadStackExecutor::wakeup(unique_lock &, std::condition_variable &)
{
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
index 332f219f12a..2280154f1a2 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h
@@ -12,8 +12,8 @@ namespace vespalib {
class ThreadStackExecutor : public ThreadStackExecutorBase
{
public:
- bool acceptNewTask(MonitorGuard &) override;
- void wakeup(MonitorGuard &) override;
+ bool acceptNewTask(unique_lock &, std::condition_variable &) override;
+ void wakeup(unique_lock &, std::condition_variable &) override;
public:
/**
@@ -38,7 +38,7 @@ public:
/**
* Will invoke cleanup.
**/
- ~ThreadStackExecutor();
+ ~ThreadStackExecutor() override;
};
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index e3a656f21a7..bde870c9cca 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -27,7 +27,7 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) {
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
- MonitorGuard guard(monitor);
+ unique_lock guard(lock);
while (blocked) {
cond.wait(guard);
}
@@ -36,7 +36,7 @@ ThreadStackExecutorBase::BlockedThread::wait() const
void
ThreadStackExecutorBase::BlockedThread::unblock()
{
- MonitorGuard guard(monitor);
+ unique_lock guard(lock);
blocked = false;
cond.notify_one();
}
@@ -48,7 +48,7 @@ thread_local ThreadStackExecutorBase *ThreadStackExecutorBase::_master = nullptr
//-----------------------------------------------------------------------------
void
-ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread)
+ThreadStackExecutorBase::block_thread(const unique_lock &, BlockedThread &blocked_thread)
{
auto pos = _blocked.begin();
while ((pos != _blocked.end()) &&
@@ -60,7 +60,7 @@ ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_
}
void
-ThreadStackExecutorBase::unblock_threads(const MonitorGuard &)
+ThreadStackExecutorBase::unblock_threads(const unique_lock &)
{
while (!_blocked.empty() && (_taskCount <= _blocked.back()->wait_task_count)) {
BlockedThread &blocked_thread = *(_blocked.back());
@@ -72,7 +72,7 @@ ThreadStackExecutorBase::unblock_threads(const MonitorGuard &)
void
ThreadStackExecutorBase::assignTask(TaggedTask task, Worker &worker)
{
- MonitorGuard monitor(worker.monitor);
+ unique_lock monitor(worker.lock);
worker.verify(/* idle: */ true);
worker.idle = false;
worker.task = std::move(task);
@@ -83,11 +83,11 @@ bool
ThreadStackExecutorBase::obtainTask(Worker &worker)
{
{
- MonitorGuard monitor(_monitor);
+ unique_lock monitor(_lock);
if (!worker.idle) {
assert(_taskCount != 0);
--_taskCount;
- wakeup(monitor);
+ wakeup(monitor, _cond);
_barrier.completeEvent(worker.task.token);
worker.idle = true;
}
@@ -105,7 +105,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
_workers.push(&worker);
}
{
- MonitorGuard monitor(worker.monitor);
+ unique_lock monitor(worker.lock);
while (worker.idle) {
worker.cond.wait(monitor);
}
@@ -138,7 +138,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
: SyncableThreadExecutor(),
Runnable(),
_pool(std::make_unique<FastOS_ThreadPool>(stackSize)),
- _monitor(),
+ _lock(),
_stats(),
_executorCompletion(),
_tasks(),
@@ -181,24 +181,24 @@ ThreadStackExecutorBase::wakeup() {
void
ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit)
{
- MonitorGuard monitor(_monitor);
+ unique_lock guard(_lock);
if (!_closed) {
_taskLimit = taskLimit;
- wakeup(monitor);
+ wakeup(guard, _cond);
}
}
size_t
ThreadStackExecutorBase::num_idle_workers() const
{
- LockGuard lock(_monitor);
+ std::unique_lock guard(_lock);
return _workers.size();
}
ThreadStackExecutorBase::Stats
ThreadStackExecutorBase::getStats()
{
- LockGuard lock(_monitor);
+ std::unique_lock guard(_lock);
Stats stats = _stats;
_stats = Stats();
_stats.queueSize.add(_taskCount);
@@ -208,8 +208,8 @@ ThreadStackExecutorBase::getStats()
ThreadStackExecutorBase::Task::UP
ThreadStackExecutorBase::execute(Task::UP task)
{
- MonitorGuard monitor(_monitor);
- if (acceptNewTask(monitor)) {
+ unique_lock guard(_lock);
+ if (acceptNewTask(guard, _cond)) {
TaggedTask taggedTask(std::move(task), _barrier.startEvent());
++_taskCount;
++_stats.acceptedTasks;
@@ -217,7 +217,7 @@ ThreadStackExecutorBase::execute(Task::UP task)
if (!_workers.empty()) {
Worker *worker = _workers.back();
_workers.popBack();
- monitor.unlock(); // <- UNLOCK
+ guard.unlock(); // <- UNLOCK
assignTask(std::move(taggedTask), *worker);
} else {
_tasks.push(std::move(taggedTask));
@@ -233,12 +233,12 @@ ThreadStackExecutorBase::shutdown()
{
ArrayQueue<Worker*> idle;
{
- MonitorGuard monitor(_monitor);
+ unique_lock guard(_lock);
_closed = true;
_taskLimit = 0;
idle.swap(_workers);
assert(idle.empty() || _tasks.empty()); // idle -> empty queue
- wakeup(monitor);
+ wakeup(guard, _cond);
}
while (!idle.empty()) {
assignTask(TaggedTask(), *idle.back());
@@ -252,7 +252,7 @@ ThreadStackExecutorBase::sync()
{
BarrierCompletion barrierCompletion;
{
- LockGuard lock(_monitor);
+ std::unique_lock guard(_lock);
if (!_barrier.startBarrier(barrierCompletion)) {
return *this;
}
@@ -264,13 +264,13 @@ ThreadStackExecutorBase::sync()
void
ThreadStackExecutorBase::wait_for_task_count(uint32_t task_count)
{
- LockGuard lock(_monitor);
+ std::unique_lock guard(_lock);
if (_taskCount <= task_count) {
return;
}
BlockedThread self(task_count);
- block_thread(lock, self);
- lock.unlock(); // <- UNLOCK
+ block_thread(guard, self);
+ guard.unlock(); // <- UNLOCK
self.wait();
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index ec41b9cfedc..e5cc87c1937 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -34,8 +34,7 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor,
{
public:
using init_fun_t = std::function<int(Runnable&)>;
- using LockGuard = std::unique_lock<std::mutex>;
- using MonitorGuard = std::unique_lock<std::mutex>;
+ using unique_lock = std::unique_lock<std::mutex>;
private:
@@ -57,13 +56,13 @@ private:
};
struct Worker {
- std::mutex monitor;
+ std::mutex lock;
std::condition_variable cond;
uint32_t pre_guard;
bool idle;
uint32_t post_guard;
TaggedTask task;
- Worker() : monitor(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
+ Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
void verify(bool expect_idle) const {
(void) expect_idle;
assert(pre_guard == 0xaaaaaaaa);
@@ -80,20 +79,18 @@ private:
struct BlockedThread {
const uint32_t wait_task_count;
- mutable std::mutex monitor;
+ mutable std::mutex lock;
mutable std::condition_variable cond;
bool blocked;
BlockedThread(uint32_t wait_task_count_in)
- : wait_task_count(wait_task_count_in), monitor(), cond(), blocked(true) {}
+ : wait_task_count(wait_task_count_in), lock(), cond(), blocked(true) {}
void wait() const;
void unblock();
};
std::unique_ptr<FastOS_ThreadPool> _pool;
- mutable std::mutex _monitor;
-protected:
+ mutable std::mutex _lock;
std::condition_variable _cond;
-private:
Stats _stats;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
@@ -106,8 +103,8 @@ private:
std::unique_ptr<thread::ThreadInit> _thread_init;
static thread_local ThreadStackExecutorBase *_master;
- void block_thread(const LockGuard &, BlockedThread &blocked_thread);
- void unblock_threads(const MonitorGuard &);
+ void block_thread(const unique_lock &, BlockedThread &blocked_thread);
+ void unblock_threads(const unique_lock &);
/**
* Assign the given task to the given idle worker. This will wake
@@ -136,14 +133,14 @@ protected:
* This will tell if a task will be accepted or not.
* An implementation might decide to block.
*/
- virtual bool acceptNewTask(MonitorGuard & monitor) = 0;
+ virtual bool acceptNewTask(unique_lock & guard, std::condition_variable & cond) = 0;
/**
* If blocking implementation, this might wake up any waiters.
*
* @param monitor to use for signaling.
*/
- virtual void wakeup(MonitorGuard & monitor) = 0;
+ virtual void wakeup(unique_lock & guard, std::condition_variable & cond) = 0;
/**
* Will tell you if the executor has been closed for new tasks.