diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-28 11:27:50 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-28 11:27:50 +0000 |
commit | ee9d06da8b8d25216290fa4f52145d14035b04e7 (patch) | |
tree | 6935fb7249df704c90bcd0b22a0e63cf1bd91fb6 /staging_vespalib | |
parent | 821cbc57fcd370ae067344c329c09d1deb1700f0 (diff) |
Thread safe on put.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 28 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 10 |
2 files changed, 21 insertions, 17 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 59cc9a39957..a1766eda00b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -10,7 +10,8 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), - _monitor(), + _consumerMonitor(), + _producerMonitor(), _thread(*this), _lastAccepted(0), _maxPending(0), @@ -32,12 +33,16 @@ SingleExecutor::getNumThreads() const { Executor::Task::UP SingleExecutor::execute(Task::UP task) { - wait_for_room(); - uint64_t wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_relaxed); + uint64_t wp; + { + MonitorGuard produceGuard(_producerMonitor); + wait_for_room(produceGuard); + wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_relaxed); + } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - MonitorGuard guard(_monitor); + MonitorGuard guard(_consumerMonitor); guard.signal(); } return Task::UP(); @@ -62,8 +67,8 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_monitor); - guard.wait(1ms); + MonitorGuard guard(_consumerMonitor); + guard.wait(10ms); _wakeupConsumerAt.store(0, std::memory_order_relaxed); } } @@ -90,14 +95,14 @@ SingleExecutor::run_tasks_till(uint64_t available) { task->run(); _rp.store(++consumed, std::memory_order_relaxed); if (wakeupLimit == consumed) { - MonitorGuard guard(_monitor); + MonitorGuard guard(_producerMonitor); guard.signal(); } } } void -SingleExecutor::wait_for_room() { +SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { sync(); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); @@ -105,8 +110,7 @@ SingleExecutor::wait_for_room() { } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { _producerNeedWakeup.store(true, std::memory_order_relaxed); - MonitorGuard guard(_monitor); - guard.wait(1ms); + producerGuard.wait(10ms); _producerNeedWakeup.store(false, std::memory_order_relaxed); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 13520efa3d8..1f1f3169119 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -12,9 +12,8 @@ namespace vespalib { /** * Has a single thread consuming tasks from a fixed size ringbuffer. * Made for throughput where the producer has no interaction with the consumer and - * it is hence very cheap to produce a task. The consumer wakes up once every ms to see if there are - * anything to consumer. It uses a lock free ringbuffer, but requires only a single producer. - * That must be enforced on the outside. + * it is hence very cheap to produce a task. High and low watermark at 25%/75% is used + * to reduce ping-pong. */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: @@ -31,7 +30,7 @@ private: void run() override; void drain_tasks(); void run_tasks_till(uint64_t available); - void wait_for_room(); + void wait_for_room(MonitorGuard & producerGuard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } @@ -43,7 +42,8 @@ private: std::atomic<uint32_t> _wantedTaskLimit; std::atomic<uint64_t> _rp; std::unique_ptr<Task::UP[]> _tasks; - vespalib::Monitor _monitor; + vespalib::Monitor _consumerMonitor; + vespalib::Monitor _producerMonitor; vespalib::Thread _thread; uint64_t _lastAccepted; std::atomic<uint64_t> _maxPending; |