summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-12 11:35:01 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-12 11:35:01 +0000
commit2a3a5b321deb5e65d4a4856ee53eeafd0551d3b5 (patch)
tree6f8f57f73649cc66094b1cdfbccbfbe49da97fb4
parent2c29c9d08fcaa0f23d6eb0015d904723bdc6976e (diff)
Use std::mutex/condition_variable.
Use shorter wait during sync, as that is urgent and synchronous.
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp55
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h15
2 files changed, 28 insertions, 42 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 8f83e7e6da7..b5c5bbacd32 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -10,8 +10,10 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
- _consumerMonitor(),
- _producerMonitor(),
+ _consumerMutex(),
+ _consumerCondition(),
+ _producerMutex(),
+ _producerCondition(),
_thread(*this),
_lastAccepted(0),
_maxPending(0),
@@ -33,7 +35,7 @@ SingleExecutor::getNumThreads() const {
uint64_t
SingleExecutor::addTask(Task::UP task) {
- MonitorGuard guard(_producerMonitor);
+ Lock guard(_producerMutex);
wait_for_room(guard);
uint64_t wp = _wp.load(std::memory_order_relaxed);
_tasks[index(wp)] = std::move(task);
@@ -42,29 +44,9 @@ SingleExecutor::addTask(Task::UP task) {
}
void
-SingleExecutor::wakeupConsumer() {
- MonitorGuard guard(_consumerMonitor);
- guard.signal();
-}
-
-void
-SingleExecutor::sleepConsumer() {
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
- MonitorGuard guard(_consumerMonitor);
- guard.wait(10ms);
- _wakeupConsumerAt.store(0, std::memory_order_relaxed);
-}
-
-void
-SingleExecutor::wakeupProducer() {
- MonitorGuard guard(_producerMonitor);
- guard.signal();
-}
-
-void
-SingleExecutor::sleepProducer(MonitorGuard & guard) {
+SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime) {
_producerNeedWakeup.store(true, std::memory_order_relaxed);
- guard.wait(10ms);
+ _producerCondition.wait_for(lock, maxWaitTime);
_producerNeedWakeup.store(false, std::memory_order_relaxed);
}
@@ -72,7 +54,7 @@ Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
uint64_t wp = addTask(std::move(task));
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
- wakeupConsumer();
+ _consumerCondition.notify_one();
}
return task;
}
@@ -86,9 +68,9 @@ SingleExecutor &
SingleExecutor::sync() {
uint64_t wp = _wp.load(std::memory_order_relaxed);
while (wp > _rp.load(std::memory_order_acquire)) {
- wakeupConsumer();
- MonitorGuard guard(_producerMonitor);
- guard.wait(1ms);
+ _consumerCondition.notify_one();
+ Lock lock(_producerMutex);
+ sleepProducer(lock, 100us);
}
return *this;
}
@@ -97,8 +79,11 @@ void
SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
- wakeupProducer();
- sleepConsumer();
+ _producerCondition.notify_all();
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
+ Lock lock(_consumerMutex);
+ _consumerCondition.wait_for(lock, 10ms);
+ _wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
}
@@ -117,27 +102,27 @@ SingleExecutor::run_tasks_till(uint64_t available) {
_maxPending.store(left, std::memory_order_relaxed);
}
uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed)
- ? (available - (left >> 2))
+ ? (available - (left / 4))
: 0;
while (consumed < available) {
Task::UP task = std::move(_tasks[index(consumed)]);
task->run();
_rp.store(++consumed, std::memory_order_release);
if (wakeupLimit == consumed) {
- wakeupProducer();
+ _producerCondition.notify_all();
}
}
}
void
-SingleExecutor::wait_for_room(MonitorGuard & producerGuard) {
+SingleExecutor::wait_for_room(Lock & producerGuard) {
if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) {
sync();
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
}
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(producerGuard);
+ sleepProducer(producerGuard, 10ms);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 2a4d60594aa..9984662e416 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
+#include <vespa/vespalib/util/time.h>
#include <thread>
#include <atomic>
@@ -26,15 +27,13 @@ public:
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
private:
+ using Lock = std::unique_lock<std::mutex>;
uint64_t addTask(Task::UP task);
void run() override;
void drain_tasks();
- void wakeupConsumer();
- void sleepConsumer();
- void wakeupProducer();
- void sleepProducer(MonitorGuard & guard);
+ void sleepProducer(Lock & guard, duration maxWaitTime);
void run_tasks_till(uint64_t available);
- void wait_for_room(MonitorGuard & guard);
+ void wait_for_room(Lock & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
@@ -46,8 +45,10 @@ private:
std::atomic<uint32_t> _wantedTaskLimit;
std::atomic<uint64_t> _rp;
std::unique_ptr<Task::UP[]> _tasks;
- vespalib::Monitor _consumerMonitor;
- vespalib::Monitor _producerMonitor;
+ std::mutex _consumerMutex;
+ std::condition_variable _consumerCondition;
+ std::mutex _producerMutex;
+ std::condition_variable _producerCondition;
vespalib::Thread _thread;
uint64_t _lastAccepted;
std::atomic<uint64_t> _maxPending;