summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-11 21:52:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-11 21:52:48 +0000
commit2c29c9d08fcaa0f23d6eb0015d904723bdc6976e (patch)
treebb32201770ef83d7b62f54c82189b7125fc73cff
parentacb7410c1109426844bb4a4b2fb8c05abec7ec2f (diff)
In order to drain Q faster on sync, and also detect that we are in sync faster,
we wake the consumer unconditionally on sync, and also unconditionally wake the producer when consumer is idle.
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp47
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
2 files changed, 39 insertions, 12 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 8f1f22c4f8e..8f83e7e6da7 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -41,12 +41,38 @@ SingleExecutor::addTask(Task::UP task) {
return wp;
}
+void
+SingleExecutor::wakeupConsumer() {
+ MonitorGuard guard(_consumerMonitor);
+ guard.signal();
+}
+
+void
+SingleExecutor::sleepConsumer() {
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
+ MonitorGuard guard(_consumerMonitor);
+ guard.wait(10ms);
+ _wakeupConsumerAt.store(0, std::memory_order_relaxed);
+}
+
+void
+SingleExecutor::wakeupProducer() {
+ MonitorGuard guard(_producerMonitor);
+ guard.signal();
+}
+
+void
+SingleExecutor::sleepProducer(MonitorGuard & guard) {
+ _producerNeedWakeup.store(true, std::memory_order_relaxed);
+ guard.wait(10ms);
+ _producerNeedWakeup.store(false, std::memory_order_relaxed);
+}
+
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
uint64_t wp = addTask(std::move(task));
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
- MonitorGuard guard(_consumerMonitor);
- guard.signal();
+ wakeupConsumer();
}
return task;
}
@@ -60,7 +86,9 @@ SingleExecutor &
SingleExecutor::sync() {
uint64_t wp = _wp.load(std::memory_order_relaxed);
while (wp > _rp.load(std::memory_order_acquire)) {
- std::this_thread::sleep_for(1ms);
+ wakeupConsumer();
+ MonitorGuard guard(_producerMonitor);
+ guard.wait(1ms);
}
return *this;
}
@@ -69,10 +97,8 @@ void
SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
- MonitorGuard guard(_consumerMonitor);
- guard.wait(10ms);
- _wakeupConsumerAt.store(0, std::memory_order_relaxed);
+ wakeupProducer();
+ sleepConsumer();
}
}
@@ -98,8 +124,7 @@ SingleExecutor::run_tasks_till(uint64_t available) {
task->run();
_rp.store(++consumed, std::memory_order_release);
if (wakeupLimit == consumed) {
- MonitorGuard guard(_producerMonitor);
- guard.broadcast();
+ wakeupProducer();
}
}
}
@@ -112,9 +137,7 @@ SingleExecutor::wait_for_room(MonitorGuard & producerGuard) {
_taskLimit = _wantedTaskLimit.load();
}
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- _producerNeedWakeup.store(true, std::memory_order_relaxed);
- producerGuard.wait(10ms);
- _producerNeedWakeup.store(false, std::memory_order_relaxed);
+ sleepProducer(producerGuard);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 0ab89355566..2a4d60594aa 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -29,6 +29,10 @@ private:
uint64_t addTask(Task::UP task);
void run() override;
void drain_tasks();
+ void wakeupConsumer();
+ void sleepConsumer();
+ void wakeupProducer();
+ void sleepProducer(MonitorGuard & guard);
void run_tasks_till(uint64_t available);
void wait_for_room(MonitorGuard & guard);
uint64_t index(uint64_t counter) const {