diff options
Diffstat (limited to 'staging_vespalib/src')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp | 85 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h | 32 |
2 files changed, 68 insertions, 49 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 765d7b6c9f5..8139fd6d11b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -1,7 +1,6 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "adaptive_sequenced_executor.h" -#include <vespa/vespalib/util/lambdatask.h> namespace vespalib { @@ -180,12 +179,15 @@ AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std: return true; } -AdaptiveSequencedExecutor::Task::UP -AdaptiveSequencedExecutor::next_task(Worker &worker) +AdaptiveSequencedExecutor::TaggedTask +AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> prev_token) { - Task::UP task; + TaggedTask task; Worker *worker_to_wake = nullptr; auto guard = std::unique_lock(_mutex); + if (prev_token.has_value()) { + _barrier.completeEvent(prev_token.value()); + } if (exchange_strand(worker, guard)) { assert(worker.state == Worker::State::RUNNING); assert(worker.strand != nullptr); @@ -213,41 +215,14 @@ void AdaptiveSequencedExecutor::worker_main() { Worker worker; - while (Task::UP my_task = next_task(worker)) { - my_task->run(); + std::optional<uint32_t> prev_token = std::nullopt; + while (TaggedTask my_task = next_task(worker, prev_token)) { + my_task.task->run(); + prev_token = my_task.token; } _thread_tools->allow_worker_exit.await(); } -void -AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock) -{ - assert(_self.state != Self::State::CLOSED); - strand.queue.push(std::move(task)); - _stats.queueSize.add(++_self.pending_tasks); - ++_stats.acceptedTasks; - if (strand.state == Strand::State::WAITING) { - ++_self.waiting_tasks; - } else if (strand.state == Strand::State::IDLE) { - if (_worker_stack.size() < _cfg.num_threads) { - strand.state = Strand::State::WAITING; - _wait_queue.push(&strand); - _self.waiting_tasks += strand.queue.size(); - } else { - strand.state = Strand::State::ACTIVE; - assert(_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = &strand; - lock.unlock(); // UNLOCK - worker->cond.notify_one(); - } - } -} - AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending) : ISequencedTaskExecutor(num_strands), @@ -297,26 +272,44 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) assert(id.getId() < _strands.size()); Strand &strand = _strands[id.getId()]; auto guard = std::unique_lock(_mutex); + assert(_self.state != Self::State::CLOSED); maybe_block_self(guard); - run_task_in_strand(std::move(task), strand, guard); + strand.queue.push(TaggedTask(std::move(task), _barrier.startEvent())); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + guard.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } } void AdaptiveSequencedExecutor::sync() { - vespalib::CountDownLatch latch(_strands.size()); + BarrierCompletion barrierCompletion; { - auto guard = std::unique_lock(_mutex); - for (Strand &strand: _strands) { - if (strand.state == Strand::State::IDLE) { - latch.countDown(); - } else { - Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); }); - run_task_in_strand(std::move(task), strand, guard); - } + auto guard = std::scoped_lock(_mutex); + if (!_barrier.startBarrier(barrierCompletion)) { + return; } } - latch.await(); + barrierCompletion.gate.await(); } void diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index 8f8b359797e..c52b9b22245 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -5,9 +5,12 @@ #include "isequencedtaskexecutor.h" #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/gate.h> +#include <vespa/vespalib/util/eventbarrier.hpp> +#include <vespa/vespalib/util/gate.h> #include <vespa/fastos/thread.h> #include <mutex> #include <condition_variable> +#include <optional> #include <cassert> namespace vespalib { @@ -23,6 +26,24 @@ private: using Stats = vespalib::ExecutorStats; using Task = vespalib::Executor::Task; + struct TaggedTask { + Task::UP task; + uint32_t token; + TaggedTask() : task(nullptr), token(0) {} + TaggedTask(Task::UP task_in, uint32_t token_in) + : task(std::move(task_in)), token(token_in) {} + TaggedTask(TaggedTask &&rhs) = default; + TaggedTask(const TaggedTask &rhs) = delete; + TaggedTask &operator=(const TaggedTask &rhs) = delete; + TaggedTask &operator=(TaggedTask &&rhs) { + assert(task.get() == nullptr); // no overwrites + task = std::move(rhs.task); + token = rhs.token; + return *this; + } + operator bool() const { return bool(task); } + }; + /** * Values used to configure the executor. **/ @@ -51,7 +72,7 @@ private: struct Strand { enum class State { IDLE, WAITING, ACTIVE }; State state; - vespalib::ArrayQueue<Task::UP> queue; + vespalib::ArrayQueue<TaggedTask> queue; Strand(); ~Strand(); }; @@ -96,11 +117,17 @@ private: void close(); }; + struct BarrierCompletion { + Gate gate; + void completeBarrier() { gate.countDown(); } + }; + std::unique_ptr<ThreadTools> _thread_tools; std::mutex _mutex; std::vector<Strand> _strands; vespalib::ArrayQueue<Strand*> _wait_queue; vespalib::ArrayQueue<Worker*> _worker_stack; + EventBarrier<BarrierCompletion> _barrier; Self _self; Stats _stats; Config _cfg; @@ -111,9 +138,8 @@ private: Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock); bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock); bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); - Task::UP next_task(Worker &worker); + TaggedTask next_task(Worker &worker, std::optional<uint32_t> prev_token); void worker_main(); - void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock<std::mutex> &lock); public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); |