diff options
author | Håvard Pettersen <havardpe@oath.com> | 2020-07-02 11:04:13 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2020-07-02 11:04:13 +0000 |
commit | 66b9ce0c2189729a310cb44f2b425484d149463f (patch) | |
tree | 5380dc276c5b0db2b1f114b496b40b378008f6f6 /staging_vespalib/src | |
parent | 87372a16c7ec43a5babcf44bb5ff109ac3369b2e (diff) |
improve sync
- lock once to post all sync tasks
- only post to non-idle strands
- let sync tasks bypass blocking based on task limit
Diffstat (limited to 'staging_vespalib/src')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp | 67 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h | 1 |
2 files changed, 42 insertions, 26 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 3e87749c794..765d7b6c9f5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -1,6 +1,7 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "adaptive_sequenced_executor.h" +#include <vespa/vespalib/util/lambdatask.h> namespace vespalib { @@ -218,6 +219,35 @@ AdaptiveSequencedExecutor::worker_main() _thread_tools->allow_worker_exit.await(); } +void +AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock) +{ + assert(_self.state != Self::State::CLOSED); + strand.queue.push(std::move(task)); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + lock.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } +} + AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending) : ISequencedTaskExecutor(num_strands), @@ -268,38 +298,23 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) Strand &strand = _strands[id.getId()]; auto guard = std::unique_lock(_mutex); maybe_block_self(guard); - assert(_self.state != Self::State::CLOSED); - strand.queue.push(std::move(task)); - _stats.queueSize.add(++_self.pending_tasks); - ++_stats.acceptedTasks; - if (strand.state == Strand::State::WAITING) { - ++_self.waiting_tasks; - } else if (strand.state == Strand::State::IDLE) { - if (_worker_stack.size() < _cfg.num_threads) { - strand.state = Strand::State::WAITING; - _wait_queue.push(&strand); - _self.waiting_tasks += strand.queue.size(); - } else { - strand.state = Strand::State::ACTIVE; - assert(_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = &strand; - guard.unlock(); // UNLOCK - worker->cond.notify_one(); - } - } + run_task_in_strand(std::move(task), strand, guard); } void AdaptiveSequencedExecutor::sync() { vespalib::CountDownLatch latch(_strands.size()); - for (size_t i = 0; i < _strands.size(); ++i) { - execute(ExecutorId(i), [&](){ latch.countDown(); }); + { + auto guard = std::unique_lock(_mutex); + for (Strand &strand: _strands) { + if (strand.state == Strand::State::IDLE) { + latch.countDown(); + } else { + Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); }); + run_task_in_strand(std::move(task), strand, guard); + } + } } latch.await(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index a4d3ac97758..8f8b359797e 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -113,6 +113,7 @@ private: bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); Task::UP next_task(Worker &worker); void worker_main(); + void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock); public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); |