diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-11 21:52:48 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-11 21:52:48 +0000 |
commit | 2c29c9d08fcaa0f23d6eb0015d904723bdc6976e (patch) | |
tree | bb32201770ef83d7b62f54c82189b7125fc73cff /staging_vespalib | |
parent | acb7410c1109426844bb4a4b2fb8c05abec7ec2f (diff) |
In order to drain Q faster on sync, and also detect that we are in sync faster,
we wake the consumer unconditionally on sync, and also unconditionally wake the producer when consumer is idle.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 47 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 4 |
2 files changed, 39 insertions, 12 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 8f1f22c4f8e..8f83e7e6da7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -41,12 +41,38 @@ SingleExecutor::addTask(Task::UP task) { return wp; } +void +SingleExecutor::wakeupConsumer() { + MonitorGuard guard(_consumerMonitor); + guard.signal(); +} + +void +SingleExecutor::sleepConsumer() { + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); + MonitorGuard guard(_consumerMonitor); + guard.wait(10ms); + _wakeupConsumerAt.store(0, std::memory_order_relaxed); +} + +void +SingleExecutor::wakeupProducer() { + MonitorGuard guard(_producerMonitor); + guard.signal(); +} + +void +SingleExecutor::sleepProducer(MonitorGuard & guard) { + _producerNeedWakeup.store(true, std::memory_order_relaxed); + guard.wait(10ms); + _producerNeedWakeup.store(false, std::memory_order_relaxed); +} + Executor::Task::UP SingleExecutor::execute(Task::UP task) { uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - MonitorGuard guard(_consumerMonitor); - guard.signal(); + wakeupConsumer(); } return task; } @@ -60,7 +86,9 @@ SingleExecutor & SingleExecutor::sync() { uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { - std::this_thread::sleep_for(1ms); + wakeupConsumer(); + MonitorGuard guard(_producerMonitor); + guard.wait(1ms); } return *this; } @@ -69,10 +97,8 @@ void SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_consumerMonitor); - guard.wait(10ms); - _wakeupConsumerAt.store(0, std::memory_order_relaxed); + wakeupProducer(); + sleepConsumer(); } } @@ -98,8 +124,7 @@ SingleExecutor::run_tasks_till(uint64_t available) { task->run(); _rp.store(++consumed, std::memory_order_release); if (wakeupLimit == consumed) { - MonitorGuard guard(_producerMonitor); - guard.broadcast(); + wakeupProducer(); } } } @@ -112,9 +137,7 @@ SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { _taskLimit = _wantedTaskLimit.load(); } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - _producerNeedWakeup.store(true, std::memory_order_relaxed); - producerGuard.wait(10ms); - _producerNeedWakeup.store(false, std::memory_order_relaxed); + sleepProducer(producerGuard); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 0ab89355566..2a4d60594aa 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -29,6 +29,10 @@ private: uint64_t addTask(Task::UP task); void run() override; void drain_tasks(); + void wakeupConsumer(); + void sleepConsumer(); + void wakeupProducer(); + void sleepProducer(MonitorGuard & guard); void run_tasks_till(uint64_t available); void wait_for_room(MonitorGuard & guard); uint64_t index(uint64_t counter) const { |