From 2a3a5b321deb5e65d4a4856ee53eeafd0551d3b5 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 12 Mar 2020 11:35:01 +0000 Subject: Use std::mutex/condition_variable. Use shorter wait during sync, as that is urgent and synchronous. --- .../src/vespa/vespalib/util/singleexecutor.cpp | 55 ++++++++-------------- .../src/vespa/vespalib/util/singleexecutor.h | 15 +++--- 2 files changed, 28 insertions(+), 42 deletions(-) (limited to 'staging_vespalib/src') diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 8f83e7e6da7..b5c5bbacd32 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -10,8 +10,10 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique(_taskLimit)), - _consumerMonitor(), - _producerMonitor(), + _consumerMutex(), + _consumerCondition(), + _producerMutex(), + _producerCondition(), _thread(*this), _lastAccepted(0), _maxPending(0), @@ -33,7 +35,7 @@ SingleExecutor::getNumThreads() const { uint64_t SingleExecutor::addTask(Task::UP task) { - MonitorGuard guard(_producerMonitor); + Lock guard(_producerMutex); wait_for_room(guard); uint64_t wp = _wp.load(std::memory_order_relaxed); _tasks[index(wp)] = std::move(task); @@ -42,29 +44,9 @@ SingleExecutor::addTask(Task::UP task) { } void -SingleExecutor::wakeupConsumer() { - MonitorGuard guard(_consumerMonitor); - guard.signal(); -} - -void -SingleExecutor::sleepConsumer() { - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_consumerMonitor); - guard.wait(10ms); - _wakeupConsumerAt.store(0, std::memory_order_relaxed); -} - -void -SingleExecutor::wakeupProducer() { - MonitorGuard guard(_producerMonitor); - guard.signal(); -} - -void -SingleExecutor::sleepProducer(MonitorGuard & guard) { +SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime) { _producerNeedWakeup.store(true, std::memory_order_relaxed); - guard.wait(10ms); + _producerCondition.wait_for(lock, maxWaitTime); _producerNeedWakeup.store(false, std::memory_order_relaxed); } @@ -72,7 +54,7 @@ Executor::Task::UP SingleExecutor::execute(Task::UP task) { uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - wakeupConsumer(); + _consumerCondition.notify_one(); } return task; } @@ -86,9 +68,9 @@ SingleExecutor & SingleExecutor::sync() { uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { - wakeupConsumer(); - MonitorGuard guard(_producerMonitor); - guard.wait(1ms); + _consumerCondition.notify_one(); + Lock lock(_producerMutex); + sleepProducer(lock, 100us); } return *this; } @@ -97,8 +79,11 @@ void SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); - wakeupProducer(); - sleepConsumer(); + _producerCondition.notify_all(); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + Lock lock(_consumerMutex); + _consumerCondition.wait_for(lock, 10ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); } } @@ -117,27 +102,27 @@ SingleExecutor::run_tasks_till(uint64_t available) { _maxPending.store(left, std::memory_order_relaxed); } uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) - ? (available - (left >> 2)) + ? (available - (left / 4)) : 0; while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); task->run(); _rp.store(++consumed, std::memory_order_release); if (wakeupLimit == consumed) { - wakeupProducer(); + _producerCondition.notify_all(); } } } void -SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { +SingleExecutor::wait_for_room(Lock & producerGuard) { if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { sync(); _tasks = std::make_unique(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(producerGuard); + sleepProducer(producerGuard, 10ms); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 2a4d60594aa..9984662e416 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -26,15 +27,13 @@ public: uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; private: + using Lock = std::unique_lock; uint64_t addTask(Task::UP task); void run() override; void drain_tasks(); - void wakeupConsumer(); - void sleepConsumer(); - void wakeupProducer(); - void sleepProducer(MonitorGuard & guard); + void sleepProducer(Lock & guard, duration maxWaitTime); void run_tasks_till(uint64_t available); - void wait_for_room(MonitorGuard & guard); + void wait_for_room(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } @@ -46,8 +45,10 @@ private: std::atomic _wantedTaskLimit; std::atomic _rp; std::unique_ptr _tasks; - vespalib::Monitor _consumerMonitor; - vespalib::Monitor _producerMonitor; + std::mutex _consumerMutex; + std::condition_variable _consumerCondition; + std::mutex _producerMutex; + std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; std::atomic _maxPending; -- cgit v1.2.3