diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-11 21:52:48 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-11 21:52:48 +0000 |
commit | 2c29c9d08fcaa0f23d6eb0015d904723bdc6976e (patch) | |
tree | bb32201770ef83d7b62f54c82189b7125fc73cff | |
parent | acb7410c1109426844bb4a4b2fb8c05abec7ec2f (diff) |
In order to drain Q faster on sync, and also detect that we are in sync faster,
we wake the consumer unconditionally on sync, and also unconditionally wake the producer when consumer is idle.
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 47 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 4 |
2 files changed, 39 insertions, 12 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 8f1f22c4f8e..8f83e7e6da7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -41,12 +41,38 @@ SingleExecutor::addTask(Task::UP task) { return wp; } +void +SingleExecutor::wakeupConsumer() { + MonitorGuard guard(_consumerMonitor); + guard.signal(); +} + +void +SingleExecutor::sleepConsumer() { + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); + MonitorGuard guard(_consumerMonitor); + guard.wait(10ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); +} + +void +SingleExecutor::wakeupProducer() { + MonitorGuard guard(_producerMonitor); + guard.signal(); +} + +void +SingleExecutor::sleepProducer(MonitorGuard & guard) { + _producerNeedWakeup.store(true, std::memory_order_relaxed); + guard.wait(10ms); + _producerNeedWakeup.store(false, std::memory_order_relaxed); +} + Executor::Task::UP SingleExecutor::execute(Task::UP task) { uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - MonitorGuard guard(_consumerMonitor); - guard.signal(); + wakeupConsumer(); } return task; } @@ -60,7 +86,9 @@ SingleExecutor & SingleExecutor::sync() { uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { - std::this_thread::sleep_for(1ms); + wakeupConsumer(); + MonitorGuard guard(_producerMonitor); + guard.wait(1ms); } return *this; } @@ -69,10 +97,8 @@ void SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_consumerMonitor); - guard.wait(10ms); - _wakeupConsumerAt.store(0, std::memory_order_relaxed); + wakeupProducer(); + sleepConsumer(); } } @@ -98,8 +124,7 @@ SingleExecutor::run_tasks_till(uint64_t available) { task->run(); _rp.store(++consumed, std::memory_order_release); if (wakeupLimit == consumed) { - MonitorGuard guard(_producerMonitor); - guard.broadcast(); + wakeupProducer(); } } } @@ -112,9 +137,7 @@ SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { _taskLimit = _wantedTaskLimit.load(); } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - _producerNeedWakeup.store(true, std::memory_order_relaxed); - producerGuard.wait(10ms); - _producerNeedWakeup.store(false, std::memory_order_relaxed); + sleepProducer(producerGuard); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 0ab89355566..2a4d60594aa 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -29,6 +29,10 @@ private: uint64_t addTask(Task::UP task); void run() override; void drain_tasks(); + void wakeupConsumer(); + void sleepConsumer(); + void wakeupProducer(); + void sleepProducer(MonitorGuard & guard); void run_tasks_till(uint64_t available); void wait_for_room(MonitorGuard & guard); uint64_t index(uint64_t counter) const { |