From 3269532b70f203cd302820ce1adc3facfe35dcf8 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 12 Mar 2020 15:39:50 +0000 Subject: - Use a single common lock. - Introduce 2 stage startSync/sync. - avoid loosing wakeup on sync. --- .../src/vespa/vespalib/util/singleexecutor.cpp | 52 ++++++++++++++-------- .../src/vespa/vespalib/util/singleexecutor.h | 9 ++-- 2 files changed, 39 insertions(+), 22 deletions(-) (limited to 'staging_vespalib') diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index b5c5bbacd32..73612452b09 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -10,15 +10,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique(_taskLimit)), - _consumerMutex(), + _mutex(), _consumerCondition(), - _producerMutex(), _producerCondition(), _thread(*this), _lastAccepted(0), _maxPending(0), _wakeupConsumerAt(0), - _producerNeedWakeup(false), + _producerNeedWakeupAt(0), _wp(0) { _thread.start(); @@ -35,7 +34,7 @@ SingleExecutor::getNumThreads() const { uint64_t SingleExecutor::addTask(Task::UP task) { - Lock guard(_producerMutex); + Lock guard(_mutex); wait_for_room(guard); uint64_t wp = _wp.load(std::memory_order_relaxed); _tasks[index(wp)] = std::move(task); @@ -44,10 +43,10 @@ SingleExecutor::addTask(Task::UP task) { } void -SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime) { - _producerNeedWakeup.store(true, std::memory_order_relaxed); +SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) { + _producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed); _producerCondition.wait_for(lock, maxWaitTime); - _producerNeedWakeup.store(false, std::memory_order_relaxed); + _producerNeedWakeupAt.store(0, std::memory_order_relaxed); } Executor::Task::UP @@ -64,13 +63,27 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); } +void +SingleExecutor::drain(Lock & lock) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + while (numTasks() > 0) { + _consumerCondition.notify_one(); + sleepProducer(lock, 100us, wp); + } +} + +void +SingleExecutor::startSync() { + _consumerCondition.notify_one(); +} + SingleExecutor & SingleExecutor::sync() { + Lock lock(_mutex); uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); - Lock lock(_producerMutex); - sleepProducer(lock, 100us); + sleepProducer(lock, 100us, wp); } return *this; } @@ -81,8 +94,10 @@ SingleExecutor::run() { drain_tasks(); _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); - Lock lock(_consumerMutex); - _consumerCondition.wait_for(lock, 10ms); + Lock lock(_mutex); + if (numTasks() <= 0) { + _consumerCondition.wait_for(lock, 10ms); + } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } } @@ -101,9 +116,7 @@ SingleExecutor::run_tasks_till(uint64_t available) { if (_maxPending.load(std::memory_order_relaxed) < left) { _maxPending.store(left, std::memory_order_relaxed); } - uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) - ? (available - (left / 4)) - : 0; + uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed); while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); task->run(); @@ -115,14 +128,17 @@ SingleExecutor::run_tasks_till(uint64_t available) { } void -SingleExecutor::wait_for_room(Lock & producerGuard) { - if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { - sync(); +SingleExecutor::wait_for_room(Lock & lock) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); + if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { + drain(lock); _tasks = std::make_unique(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); + taskLimit = _taskLimit; } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(producerGuard, 10ms); + sleepProducer(lock, 10ms, wp - taskLimit/4); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 9984662e416..9c3ebb4caf7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -23,15 +23,17 @@ public: Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; SingleExecutor & sync() override; + void startSync(); size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; private: using Lock = std::unique_lock; uint64_t addTask(Task::UP task); + void drain(Lock & lock); void run() override; void drain_tasks(); - void sleepProducer(Lock & guard, duration maxWaitTime); + void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); void wait_for_room(Lock & guard); uint64_t index(uint64_t counter) const { @@ -45,15 +47,14 @@ private: std::atomic _wantedTaskLimit; std::atomic _rp; std::unique_ptr _tasks; - std::mutex _consumerMutex; + std::mutex _mutex; std::condition_variable _consumerCondition; - std::mutex _producerMutex; std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; std::atomic _maxPending; std::atomic _wakeupConsumerAt; - std::atomic _producerNeedWakeup; + std::atomic _producerNeedWakeupAt; std::atomic _wp; }; -- cgit v1.2.3