summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-12 15:39:50 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-12 15:39:50 +0000
commit3269532b70f203cd302820ce1adc3facfe35dcf8 (patch)
tree549edeb0928ac824150e62a57804276cd09a4064
parent2a3a5b321deb5e65d4a4856ee53eeafd0551d3b5 (diff)
- Use a single common lock.
- Introduce 2 stage startSync/sync. - avoid loosing wakeup on sync.
-rw-r--r--searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp52
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h9
3 files changed, 46 insertions, 22 deletions
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
index 7b0c30ec9d8..c9b4af6cb55 100644
--- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp
@@ -62,6 +62,13 @@ void
SequencedTaskExecutor::sync()
{
for (auto &executor : *_executors) {
+ SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get());
+ if (single) {
+ //Enforce parallel wakeup of napping executors.
+ single->startSync();
+ }
+ }
+ for (auto &executor : *_executors) {
executor->sync();
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index b5c5bbacd32..73612452b09 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -10,15 +10,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
- _consumerMutex(),
+ _mutex(),
_consumerCondition(),
- _producerMutex(),
_producerCondition(),
_thread(*this),
_lastAccepted(0),
_maxPending(0),
_wakeupConsumerAt(0),
- _producerNeedWakeup(false),
+ _producerNeedWakeupAt(0),
_wp(0)
{
_thread.start();
@@ -35,7 +34,7 @@ SingleExecutor::getNumThreads() const {
uint64_t
SingleExecutor::addTask(Task::UP task) {
- Lock guard(_producerMutex);
+ Lock guard(_mutex);
wait_for_room(guard);
uint64_t wp = _wp.load(std::memory_order_relaxed);
_tasks[index(wp)] = std::move(task);
@@ -44,10 +43,10 @@ SingleExecutor::addTask(Task::UP task) {
}
void
-SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime) {
- _producerNeedWakeup.store(true, std::memory_order_relaxed);
+SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) {
+ _producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed);
_producerCondition.wait_for(lock, maxWaitTime);
- _producerNeedWakeup.store(false, std::memory_order_relaxed);
+ _producerNeedWakeupAt.store(0, std::memory_order_relaxed);
}
Executor::Task::UP
@@ -64,13 +63,27 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) {
_wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
}
+void
+SingleExecutor::drain(Lock & lock) {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ while (numTasks() > 0) {
+ _consumerCondition.notify_one();
+ sleepProducer(lock, 100us, wp);
+ }
+}
+
+void
+SingleExecutor::startSync() {
+ _consumerCondition.notify_one();
+}
+
SingleExecutor &
SingleExecutor::sync() {
+ Lock lock(_mutex);
uint64_t wp = _wp.load(std::memory_order_relaxed);
while (wp > _rp.load(std::memory_order_acquire)) {
_consumerCondition.notify_one();
- Lock lock(_producerMutex);
- sleepProducer(lock, 100us);
+ sleepProducer(lock, 100us, wp);
}
return *this;
}
@@ -81,8 +94,10 @@ SingleExecutor::run() {
drain_tasks();
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
- Lock lock(_consumerMutex);
- _consumerCondition.wait_for(lock, 10ms);
+ Lock lock(_mutex);
+ if (numTasks() <= 0) {
+ _consumerCondition.wait_for(lock, 10ms);
+ }
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
}
@@ -101,9 +116,7 @@ SingleExecutor::run_tasks_till(uint64_t available) {
if (_maxPending.load(std::memory_order_relaxed) < left) {
_maxPending.store(left, std::memory_order_relaxed);
}
- uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed)
- ? (available - (left / 4))
- : 0;
+ uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed);
while (consumed < available) {
Task::UP task = std::move(_tasks[index(consumed)]);
task->run();
@@ -115,14 +128,17 @@ SingleExecutor::run_tasks_till(uint64_t available) {
}
void
-SingleExecutor::wait_for_room(Lock & producerGuard) {
- if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) {
- sync();
+SingleExecutor::wait_for_room(Lock & lock) {
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed);
+ if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) {
+ drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
+ taskLimit = _taskLimit;
}
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(producerGuard, 10ms);
+ sleepProducer(lock, 10ms, wp - taskLimit/4);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 9984662e416..9c3ebb4caf7 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -23,15 +23,17 @@ public:
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
SingleExecutor & sync() override;
+ void startSync();
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
private:
using Lock = std::unique_lock<std::mutex>;
uint64_t addTask(Task::UP task);
+ void drain(Lock & lock);
void run() override;
void drain_tasks();
- void sleepProducer(Lock & guard, duration maxWaitTime);
+ void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
void wait_for_room(Lock & guard);
uint64_t index(uint64_t counter) const {
@@ -45,15 +47,14 @@ private:
std::atomic<uint32_t> _wantedTaskLimit;
std::atomic<uint64_t> _rp;
std::unique_ptr<Task::UP[]> _tasks;
- std::mutex _consumerMutex;
+ std::mutex _mutex;
std::condition_variable _consumerCondition;
- std::mutex _producerMutex;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
uint64_t _lastAccepted;
std::atomic<uint64_t> _maxPending;
std::atomic<uint64_t> _wakeupConsumerAt;
- std::atomic<bool> _producerNeedWakeup;
+ std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
};