diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 10:06:37 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-15 10:06:37 +0000 |
commit | 88232fd29c093e6609bb6cbd6df1e0dd1f1d69f5 (patch) | |
tree | fad57deca1f1205933dfd8ccd645dabae91ff457 /vespalib | |
parent | c6b7ba31a5b4f1df51c3454985d5b3833f42635c (diff) |
- LockGuard -> unique_lock
- Pass cond to avoid having protected members.
- monitor -> lock
Diffstat (limited to 'vespalib')
6 files changed, 44 insertions, 47 deletions
diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 2701eaab2b5..45e64773ea7 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -7,18 +7,18 @@ namespace vespalib { VESPA_THREAD_STACK_TAG(unnamed_blocking_executor); bool -BlockingThreadStackExecutor::acceptNewTask(MonitorGuard & guard) +BlockingThreadStackExecutor::acceptNewTask(unique_lock & guard, std::condition_variable & cond) { while (!closed() && !isRoomForNewTask() && !owns_this_thread()) { - _cond.wait(guard); + cond.wait(guard); } return (!closed()); } void -BlockingThreadStackExecutor::wakeup(MonitorGuard &) +BlockingThreadStackExecutor::wakeup(unique_lock &, std::condition_variable & cond) { - _cond.notify_all(); + cond.notify_all(); } BlockingThreadStackExecutor::BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit) diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index df3f9408e0a..63e25d1554b 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -12,8 +12,8 @@ namespace vespalib { class BlockingThreadStackExecutor : public ThreadStackExecutorBase { private: - bool acceptNewTask(MonitorGuard & monitor) override; - void wakeup(MonitorGuard & monitor) override; + bool acceptNewTask(unique_lock & guard, std::condition_variable & cond) override; + void wakeup(unique_lock & guard, std::condition_variable &) override; public: /** @@ -32,7 +32,7 @@ public: BlockingThreadStackExecutor(uint32_t threads, uint32_t stackSize, uint32_t taskLimit, init_fun_t init_function); - ~BlockingThreadStackExecutor(); + ~BlockingThreadStackExecutor() override; }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp index 6d45a78b331..bd76dcd0219 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.cpp @@ -7,13 +7,13 @@ namespace vespalib { VESPA_THREAD_STACK_TAG(unnamed_nonblocking_executor); bool -ThreadStackExecutor::acceptNewTask(MonitorGuard &) +ThreadStackExecutor::acceptNewTask(unique_lock &, std::condition_variable &) { return isRoomForNewTask(); } void -ThreadStackExecutor::wakeup(MonitorGuard &) +ThreadStackExecutor::wakeup(unique_lock &, std::condition_variable &) { } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h index 332f219f12a..2280154f1a2 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutor.h @@ -12,8 +12,8 @@ namespace vespalib { class ThreadStackExecutor : public ThreadStackExecutorBase { public: - bool acceptNewTask(MonitorGuard &) override; - void wakeup(MonitorGuard &) override; + bool acceptNewTask(unique_lock &, std::condition_variable &) override; + void wakeup(unique_lock &, std::condition_variable &) override; public: /** @@ -38,7 +38,7 @@ public: /** * Will invoke cleanup. **/ - ~ThreadStackExecutor(); + ~ThreadStackExecutor() override; }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index e3a656f21a7..bde870c9cca 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -27,7 +27,7 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) { void ThreadStackExecutorBase::BlockedThread::wait() const { - MonitorGuard guard(monitor); + unique_lock guard(lock); while (blocked) { cond.wait(guard); } @@ -36,7 +36,7 @@ ThreadStackExecutorBase::BlockedThread::wait() const void ThreadStackExecutorBase::BlockedThread::unblock() { - MonitorGuard guard(monitor); + unique_lock guard(lock); blocked = false; cond.notify_one(); } @@ -48,7 +48,7 @@ thread_local ThreadStackExecutorBase *ThreadStackExecutorBase::_master = nullptr //----------------------------------------------------------------------------- void -ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_thread) +ThreadStackExecutorBase::block_thread(const unique_lock &, BlockedThread &blocked_thread) { auto pos = _blocked.begin(); while ((pos != _blocked.end()) && @@ -60,7 +60,7 @@ ThreadStackExecutorBase::block_thread(const LockGuard &, BlockedThread &blocked_ } void -ThreadStackExecutorBase::unblock_threads(const MonitorGuard &) +ThreadStackExecutorBase::unblock_threads(const unique_lock &) { while (!_blocked.empty() && (_taskCount <= _blocked.back()->wait_task_count)) { BlockedThread &blocked_thread = *(_blocked.back()); @@ -72,7 +72,7 @@ ThreadStackExecutorBase::unblock_threads(const MonitorGuard &) void ThreadStackExecutorBase::assignTask(TaggedTask task, Worker &worker) { - MonitorGuard monitor(worker.monitor); + unique_lock monitor(worker.lock); worker.verify(/* idle: */ true); worker.idle = false; worker.task = std::move(task); @@ -83,11 +83,11 @@ bool ThreadStackExecutorBase::obtainTask(Worker &worker) { { - MonitorGuard monitor(_monitor); + unique_lock monitor(_lock); if (!worker.idle) { assert(_taskCount != 0); --_taskCount; - wakeup(monitor); + wakeup(monitor, _cond); _barrier.completeEvent(worker.task.token); worker.idle = true; } @@ -105,7 +105,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) _workers.push(&worker); } { - MonitorGuard monitor(worker.monitor); + unique_lock monitor(worker.lock); while (worker.idle) { worker.cond.wait(monitor); } @@ -138,7 +138,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, : SyncableThreadExecutor(), Runnable(), _pool(std::make_unique<FastOS_ThreadPool>(stackSize)), - _monitor(), + _lock(), _stats(), _executorCompletion(), _tasks(), @@ -181,24 +181,24 @@ ThreadStackExecutorBase::wakeup() { void ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit) { - MonitorGuard monitor(_monitor); + unique_lock guard(_lock); if (!_closed) { _taskLimit = taskLimit; - wakeup(monitor); + wakeup(guard, _cond); } } size_t ThreadStackExecutorBase::num_idle_workers() const { - LockGuard lock(_monitor); + std::unique_lock guard(_lock); return _workers.size(); } ThreadStackExecutorBase::Stats ThreadStackExecutorBase::getStats() { - LockGuard lock(_monitor); + std::unique_lock guard(_lock); Stats stats = _stats; _stats = Stats(); _stats.queueSize.add(_taskCount); @@ -208,8 +208,8 @@ ThreadStackExecutorBase::getStats() ThreadStackExecutorBase::Task::UP ThreadStackExecutorBase::execute(Task::UP task) { - MonitorGuard monitor(_monitor); - if (acceptNewTask(monitor)) { + unique_lock guard(_lock); + if (acceptNewTask(guard, _cond)) { TaggedTask taggedTask(std::move(task), _barrier.startEvent()); ++_taskCount; ++_stats.acceptedTasks; @@ -217,7 +217,7 @@ ThreadStackExecutorBase::execute(Task::UP task) if (!_workers.empty()) { Worker *worker = _workers.back(); _workers.popBack(); - monitor.unlock(); // <- UNLOCK + guard.unlock(); // <- UNLOCK assignTask(std::move(taggedTask), *worker); } else { _tasks.push(std::move(taggedTask)); @@ -233,12 +233,12 @@ ThreadStackExecutorBase::shutdown() { ArrayQueue<Worker*> idle; { - MonitorGuard monitor(_monitor); + unique_lock guard(_lock); _closed = true; _taskLimit = 0; idle.swap(_workers); assert(idle.empty() || _tasks.empty()); // idle -> empty queue - wakeup(monitor); + wakeup(guard, _cond); } while (!idle.empty()) { assignTask(TaggedTask(), *idle.back()); @@ -252,7 +252,7 @@ ThreadStackExecutorBase::sync() { BarrierCompletion barrierCompletion; { - LockGuard lock(_monitor); + std::unique_lock guard(_lock); if (!_barrier.startBarrier(barrierCompletion)) { return *this; } @@ -264,13 +264,13 @@ ThreadStackExecutorBase::sync() void ThreadStackExecutorBase::wait_for_task_count(uint32_t task_count) { - LockGuard lock(_monitor); + std::unique_lock guard(_lock); if (_taskCount <= task_count) { return; } BlockedThread self(task_count); - block_thread(lock, self); - lock.unlock(); // <- UNLOCK + block_thread(guard, self); + guard.unlock(); // <- UNLOCK self.wait(); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index ec41b9cfedc..e5cc87c1937 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -34,8 +34,7 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor, { public: using init_fun_t = std::function<int(Runnable&)>; - using LockGuard = std::unique_lock<std::mutex>; - using MonitorGuard = std::unique_lock<std::mutex>; + using unique_lock = std::unique_lock<std::mutex>; private: @@ -57,13 +56,13 @@ private: }; struct Worker { - std::mutex monitor; + std::mutex lock; std::condition_variable cond; uint32_t pre_guard; bool idle; uint32_t post_guard; TaggedTask task; - Worker() : monitor(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {} + Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {} void verify(bool expect_idle) const { (void) expect_idle; assert(pre_guard == 0xaaaaaaaa); @@ -80,20 +79,18 @@ private: struct BlockedThread { const uint32_t wait_task_count; - mutable std::mutex monitor; + mutable std::mutex lock; mutable std::condition_variable cond; bool blocked; BlockedThread(uint32_t wait_task_count_in) - : wait_task_count(wait_task_count_in), monitor(), cond(), blocked(true) {} + : wait_task_count(wait_task_count_in), lock(), cond(), blocked(true) {} void wait() const; void unblock(); }; std::unique_ptr<FastOS_ThreadPool> _pool; - mutable std::mutex _monitor; -protected: + mutable std::mutex _lock; std::condition_variable _cond; -private: Stats _stats; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; @@ -106,8 +103,8 @@ private: std::unique_ptr<thread::ThreadInit> _thread_init; static thread_local ThreadStackExecutorBase *_master; - void block_thread(const LockGuard &, BlockedThread &blocked_thread); - void unblock_threads(const MonitorGuard &); + void block_thread(const unique_lock &, BlockedThread &blocked_thread); + void unblock_threads(const unique_lock &); /** * Assign the given task to the given idle worker. This will wake @@ -136,14 +133,14 @@ protected: * This will tell if a task will be accepted or not. * An implementation might decide to block. */ - virtual bool acceptNewTask(MonitorGuard & monitor) = 0; + virtual bool acceptNewTask(unique_lock & guard, std::condition_variable & cond) = 0; /** * If blocking implementation, this might wake up any waiters. * * @param monitor to use for signaling. */ - virtual void wakeup(MonitorGuard & monitor) = 0; + virtual void wakeup(unique_lock & guard, std::condition_variable & cond) = 0; /** * Will tell you if the executor has been closed for new tasks. |