diff options
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 55 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 15 |
2 files changed, 28 insertions, 42 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 8f83e7e6da7..b5c5bbacd32 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -10,8 +10,10 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), - _consumerMonitor(), - _producerMonitor(), + _consumerMutex(), + _consumerCondition(), + _producerMutex(), + _producerCondition(), _thread(*this), _lastAccepted(0), _maxPending(0), @@ -33,7 +35,7 @@ SingleExecutor::getNumThreads() const { uint64_t SingleExecutor::addTask(Task::UP task) { - MonitorGuard guard(_producerMonitor); + Lock guard(_producerMutex); wait_for_room(guard); uint64_t wp = _wp.load(std::memory_order_relaxed); _tasks[index(wp)] = std::move(task); @@ -42,29 +44,9 @@ SingleExecutor::addTask(Task::UP task) { } void -SingleExecutor::wakeupConsumer() { - MonitorGuard guard(_consumerMonitor); - guard.signal(); -} - -void -SingleExecutor::sleepConsumer() { - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_consumerMonitor); - guard.wait(10ms); - _wakeupConsumerAt.store(0, std::memory_order_relaxed); -} - -void -SingleExecutor::wakeupProducer() { - MonitorGuard guard(_producerMonitor); - guard.signal(); -} - -void -SingleExecutor::sleepProducer(MonitorGuard & guard) { +SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime) { _producerNeedWakeup.store(true, std::memory_order_relaxed); - guard.wait(10ms); + _producerCondition.wait_for(lock, maxWaitTime); _producerNeedWakeup.store(false, std::memory_order_relaxed); } @@ -72,7 +54,7 @@ Executor::Task::UP SingleExecutor::execute(Task::UP task) { uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - wakeupConsumer(); + _consumerCondition.notify_one(); } return task; } @@ -86,9 +68,9 @@ SingleExecutor & SingleExecutor::sync() { uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { - wakeupConsumer(); - MonitorGuard guard(_producerMonitor); - guard.wait(1ms); + _consumerCondition.notify_one(); + Lock lock(_producerMutex); + sleepProducer(lock, 100us); } return *this; } @@ -97,8 +79,11 @@ void SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); - wakeupProducer(); - sleepConsumer(); + _producerCondition.notify_all(); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + Lock lock(_consumerMutex); + _consumerCondition.wait_for(lock, 10ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); } } @@ -117,27 +102,27 @@ SingleExecutor::run_tasks_till(uint64_t available) { _maxPending.store(left, std::memory_order_relaxed); } uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) - ? (available - (left >> 2)) + ? (available - (left / 4)) : 0; while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); task->run(); _rp.store(++consumed, std::memory_order_release); if (wakeupLimit == consumed) { - wakeupProducer(); + _producerCondition.notify_all(); } } } void -SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { +SingleExecutor::wait_for_room(Lock & producerGuard) { if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { sync(); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(producerGuard); + sleepProducer(producerGuard, 10ms); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 2a4d60594aa..9984662e416 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> +#include <vespa/vespalib/util/time.h> #include <thread> #include <atomic> @@ -26,15 +27,13 @@ public: uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; private: + using Lock = std::unique_lock<std::mutex>; uint64_t addTask(Task::UP task); void run() override; void drain_tasks(); - void wakeupConsumer(); - void sleepConsumer(); - void wakeupProducer(); - void sleepProducer(MonitorGuard & guard); + void sleepProducer(Lock & guard, duration maxWaitTime); void run_tasks_till(uint64_t available); - void wait_for_room(MonitorGuard & guard); + void wait_for_room(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } @@ -46,8 +45,10 @@ private: std::atomic<uint32_t> _wantedTaskLimit; std::atomic<uint64_t> _rp; std::unique_ptr<Task::UP[]> _tasks; - vespalib::Monitor _consumerMonitor; - vespalib::Monitor _producerMonitor; + std::mutex _consumerMutex; + std::condition_variable _consumerCondition; + std::mutex _producerMutex; + std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; std::atomic<uint64_t> _maxPending; |