summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-11 21:52:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-11 21:52:48 +0000
commit2c29c9d08fcaa0f23d6eb0015d904723bdc6976e (patch)
treebb32201770ef83d7b62f54c82189b7125fc73cff /staging_vespalib
parentacb7410c1109426844bb4a4b2fb8c05abec7ec2f (diff)
In order to drain Q faster on sync, and also detect that we are in sync faster,
we wake the consumer unconditionally on sync, and also unconditionally wake the producer when consumer is idle.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp47
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
2 files changed, 39 insertions, 12 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 8f1f22c4f8e..8f83e7e6da7 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -41,12 +41,38 @@ SingleExecutor::addTask(Task::UP task) {
return wp;
}
+void
+SingleExecutor::wakeupConsumer() {
+ MonitorGuard guard(_consumerMonitor);
+ guard.signal();
+}
+
+void
+SingleExecutor::sleepConsumer() {
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
+ MonitorGuard guard(_consumerMonitor);
+ guard.wait(10ms);
+ _wakeupConsumerAt.store(0, std::memory_order_relaxed);
+}
+
+void
+SingleExecutor::wakeupProducer() {
+ MonitorGuard guard(_producerMonitor);
+ guard.signal();
+}
+
+void
+SingleExecutor::sleepProducer(MonitorGuard & guard) {
+ _producerNeedWakeup.store(true, std::memory_order_relaxed);
+ guard.wait(10ms);
+ _producerNeedWakeup.store(false, std::memory_order_relaxed);
+}
+
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
uint64_t wp = addTask(std::move(task));
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
- MonitorGuard guard(_consumerMonitor);
- guard.signal();
+ wakeupConsumer();
}
return task;
}
@@ -60,7 +86,9 @@ SingleExecutor &
SingleExecutor::sync() {
uint64_t wp = _wp.load(std::memory_order_relaxed);
while (wp > _rp.load(std::memory_order_acquire)) {
- std::this_thread::sleep_for(1ms);
+ wakeupConsumer();
+ MonitorGuard guard(_producerMonitor);
+ guard.wait(1ms);
}
return *this;
}
@@ -69,10 +97,8 @@ void
SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
- MonitorGuard guard(_consumerMonitor);
- guard.wait(10ms);
- _wakeupConsumerAt.store(0, std::memory_order_relaxed);
+ wakeupProducer();
+ sleepConsumer();
}
}
@@ -98,8 +124,7 @@ SingleExecutor::run_tasks_till(uint64_t available) {
task->run();
_rp.store(++consumed, std::memory_order_release);
if (wakeupLimit == consumed) {
- MonitorGuard guard(_producerMonitor);
- guard.broadcast();
+ wakeupProducer();
}
}
}
@@ -112,9 +137,7 @@ SingleExecutor::wait_for_room(MonitorGuard & producerGuard) {
_taskLimit = _wantedTaskLimit.load();
}
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- _producerNeedWakeup.store(true, std::memory_order_relaxed);
- producerGuard.wait(10ms);
- _producerNeedWakeup.store(false, std::memory_order_relaxed);
+ sleepProducer(producerGuard);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 0ab89355566..2a4d60594aa 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -29,6 +29,10 @@ private:
uint64_t addTask(Task::UP task);
void run() override;
void drain_tasks();
+ void wakeupConsumer();
+ void sleepConsumer();
+ void wakeupProducer();
+ void sleepProducer(MonitorGuard & guard);
void run_tasks_till(uint64_t available);
void wait_for_room(MonitorGuard & guard);
uint64_t index(uint64_t counter) const {