diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-13 16:47:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-13 16:47:06 +0100 |
commit | 2685a6eb9cce2b3d2da1d2e2d4da195c414b4f7d (patch) | |
tree | 661d9d3b319c3299a6f70eb47044c192476f1494 | |
parent | d67466be724f2d90949739b2abd4082dbb314c5c (diff) | |
parent | 3269532b70f203cd302820ce1adc3facfe35dcf8 (diff) |
Merge pull request #12540 from vespa-engine/balder/faster-sync-reaction
In order to drain Q faster on sync, and also detect that we are in sy…
3 files changed, 62 insertions, 25 deletions
diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 7b0c30ec9d8..c9b4af6cb55 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -62,6 +62,13 @@ void SequencedTaskExecutor::sync() { for (auto &executor : *_executors) { + SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get()); + if (single) { + //Enforce parallel wakeup of napping executors. + single->startSync(); + } + } + for (auto &executor : *_executors) { executor->sync(); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 8f1f22c4f8e..73612452b09 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -10,13 +10,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wantedTaskLimit(_taskLimit.load()), _rp(0), _tasks(std::make_unique<Task::UP[]>(_taskLimit)), - _consumerMonitor(), - _producerMonitor(), + _mutex(), + _consumerCondition(), + _producerCondition(), _thread(*this), _lastAccepted(0), _maxPending(0), _wakeupConsumerAt(0), - _producerNeedWakeup(false), + _producerNeedWakeupAt(0), _wp(0) { _thread.start(); @@ -33,7 +34,7 @@ SingleExecutor::getNumThreads() const { uint64_t SingleExecutor::addTask(Task::UP task) { - MonitorGuard guard(_producerMonitor); + Lock guard(_mutex); wait_for_room(guard); uint64_t wp = _wp.load(std::memory_order_relaxed); _tasks[index(wp)] = std::move(task); @@ -41,12 +42,18 @@ SingleExecutor::addTask(Task::UP task) { return wp; } +void +SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) { + _producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed); + _producerCondition.wait_for(lock, maxWaitTime); + _producerNeedWakeupAt.store(0, std::memory_order_relaxed); +} + Executor::Task::UP SingleExecutor::execute(Task::UP task) { uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { - MonitorGuard guard(_consumerMonitor); - guard.signal(); + _consumerCondition.notify_one(); } return task; } @@ -56,11 +63,27 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); } +void +SingleExecutor::drain(Lock & lock) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + while (numTasks() > 0) { + _consumerCondition.notify_one(); + sleepProducer(lock, 100us, wp); + } +} + +void +SingleExecutor::startSync() { + _consumerCondition.notify_one(); +} + SingleExecutor & SingleExecutor::sync() { + Lock lock(_mutex); uint64_t wp = _wp.load(std::memory_order_relaxed); while (wp > _rp.load(std::memory_order_acquire)) { - std::this_thread::sleep_for(1ms); + _consumerCondition.notify_one(); + sleepProducer(lock, 100us, wp); } return *this; } @@ -69,9 +92,12 @@ void SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed); - MonitorGuard guard(_consumerMonitor); - guard.wait(10ms); + _producerCondition.notify_all(); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + Lock lock(_mutex); + if (numTasks() <= 0) { + _consumerCondition.wait_for(lock, 10ms); + } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } } @@ -90,31 +116,29 @@ SingleExecutor::run_tasks_till(uint64_t available) { if (_maxPending.load(std::memory_order_relaxed) < left) { _maxPending.store(left, std::memory_order_relaxed); } - uint64_t wakeupLimit = _producerNeedWakeup.load(std::memory_order_relaxed) - ? (available - (left >> 2)) - : 0; + uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed); while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); task->run(); _rp.store(++consumed, std::memory_order_release); if (wakeupLimit == consumed) { - MonitorGuard guard(_producerMonitor); - guard.broadcast(); + _producerCondition.notify_all(); } } } void -SingleExecutor::wait_for_room(MonitorGuard & producerGuard) { - if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) { - sync(); +SingleExecutor::wait_for_room(Lock & lock) { + uint64_t wp = _wp.load(std::memory_order_relaxed); + uint64_t taskLimit = _taskLimit.load(std::memory_order_relaxed); + if (taskLimit != _wantedTaskLimit.load(std::memory_order_relaxed)) { + drain(lock); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); + taskLimit = _taskLimit; } while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - _producerNeedWakeup.store(true, std::memory_order_relaxed); - producerGuard.wait(10ms); - _producerNeedWakeup.store(false, std::memory_order_relaxed); + sleepProducer(lock, 10ms, wp - taskLimit/4); } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 0ab89355566..9c3ebb4caf7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> +#include <vespa/vespalib/util/time.h> #include <thread> #include <atomic> @@ -22,15 +23,19 @@ public: Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; SingleExecutor & sync() override; + void startSync(); size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; private: + using Lock = std::unique_lock<std::mutex>; uint64_t addTask(Task::UP task); + void drain(Lock & lock); void run() override; void drain_tasks(); + void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); - void wait_for_room(MonitorGuard & guard); + void wait_for_room(Lock & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } @@ -42,13 +47,14 @@ private: std::atomic<uint32_t> _wantedTaskLimit; std::atomic<uint64_t> _rp; std::unique_ptr<Task::UP[]> _tasks; - vespalib::Monitor _consumerMonitor; - vespalib::Monitor _producerMonitor; + std::mutex _mutex; + std::condition_variable _consumerCondition; + std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; std::atomic<uint64_t> _maxPending; std::atomic<uint64_t> _wakeupConsumerAt; - std::atomic<bool> _producerNeedWakeup; + std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; }; |