diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-07-02 13:42:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-02 13:42:45 +0200 |
commit | 1e1b35fa0b2589994f8d01fd8676659daf94791b (patch) | |
tree | 7295718d395198a582b6d6d7a2232664e125ae1c /staging_vespalib/src | |
parent | a6ca420bef2c7b7f43afc541d62c889dbac69c06 (diff) | |
parent | 66b9ce0c2189729a310cb44f2b425484d149463f (diff) |
Merge pull request #13780 from vespa-engine/havardpe/improve-adaptive-sequenced-executor-sync
improve sync
Diffstat (limited to 'staging_vespalib/src')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp | 67 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h | 1 |
2 files changed, 42 insertions, 26 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 3e87749c794..765d7b6c9f5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -1,6 +1,7 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "adaptive_sequenced_executor.h" +#include <vespa/vespalib/util/lambdatask.h> namespace vespalib { @@ -218,6 +219,35 @@ AdaptiveSequencedExecutor::worker_main() _thread_tools->allow_worker_exit.await(); } +void +AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock) +{ + assert(_self.state != Self::State::CLOSED); + strand.queue.push(std::move(task)); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + lock.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } +} + AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending) : ISequencedTaskExecutor(num_strands), @@ -268,38 +298,23 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) Strand &strand = _strands[id.getId()]; auto guard = std::unique_lock(_mutex); maybe_block_self(guard); - assert(_self.state != Self::State::CLOSED); - strand.queue.push(std::move(task)); - _stats.queueSize.add(++_self.pending_tasks); - ++_stats.acceptedTasks; - if (strand.state == Strand::State::WAITING) { - ++_self.waiting_tasks; - } else if (strand.state == Strand::State::IDLE) { - if (_worker_stack.size() < _cfg.num_threads) { - strand.state = Strand::State::WAITING; - _wait_queue.push(&strand); - _self.waiting_tasks += strand.queue.size(); - } else { - strand.state = Strand::State::ACTIVE; - assert(_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = &strand; - guard.unlock(); // UNLOCK - worker->cond.notify_one(); - } - } + run_task_in_strand(std::move(task), strand, guard); } void AdaptiveSequencedExecutor::sync() { vespalib::CountDownLatch latch(_strands.size()); - for (size_t i = 0; i < _strands.size(); ++i) { - execute(ExecutorId(i), [&](){ latch.countDown(); }); + { + auto guard = std::unique_lock(_mutex); + for (Strand &strand: _strands) { + if (strand.state == Strand::State::IDLE) { + latch.countDown(); + } else { + Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); }); + run_task_in_strand(std::move(task), strand, guard); + } + } } latch.await(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index a4d3ac97758..8f8b359797e 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -113,6 +113,7 @@ private: bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); Task::UP next_task(Worker &worker); void worker_main(); + void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock); public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); |