From bf3726b1ba88cbe9987fea567e78de06f1bc2559 Mon Sep 17 00:00:00 2001 From: Håvard Pettersen Date: Thu, 2 Jul 2020 12:45:57 +0000 Subject: use event barrier for sync --- .../vespalib/util/adaptive_sequenced_executor.cpp | 85 ++++++++++------------ .../vespalib/util/adaptive_sequenced_executor.h | 32 +++++++- 2 files changed, 68 insertions(+), 49 deletions(-) diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 765d7b6c9f5..8139fd6d11b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -1,7 +1,6 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "adaptive_sequenced_executor.h" -#include namespace vespalib { @@ -180,12 +179,15 @@ AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock prev_token) { - Task::UP task; + TaggedTask task; Worker *worker_to_wake = nullptr; auto guard = std::unique_lock(_mutex); + if (prev_token.has_value()) { + _barrier.completeEvent(prev_token.value()); + } if (exchange_strand(worker, guard)) { assert(worker.state == Worker::State::RUNNING); assert(worker.strand != nullptr); @@ -213,41 +215,14 @@ void AdaptiveSequencedExecutor::worker_main() { Worker worker; - while (Task::UP my_task = next_task(worker)) { - my_task->run(); + std::optional prev_token = std::nullopt; + while (TaggedTask my_task = next_task(worker, prev_token)) { + my_task.task->run(); + prev_token = my_task.token; } _thread_tools->allow_worker_exit.await(); } -void -AdaptiveSequencedExecutor::run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock &lock) -{ - assert(_self.state != Self::State::CLOSED); - strand.queue.push(std::move(task)); - _stats.queueSize.add(++_self.pending_tasks); - ++_stats.acceptedTasks; - if (strand.state == Strand::State::WAITING) { - ++_self.waiting_tasks; - } else if (strand.state == Strand::State::IDLE) { - if (_worker_stack.size() < _cfg.num_threads) { - strand.state = Strand::State::WAITING; - _wait_queue.push(&strand); - _self.waiting_tasks += strand.queue.size(); - } else { - strand.state = Strand::State::ACTIVE; - assert(_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = &strand; - lock.unlock(); // UNLOCK - worker->cond.notify_one(); - } - } -} - AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending) : ISequencedTaskExecutor(num_strands), @@ -297,26 +272,44 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) assert(id.getId() < _strands.size()); Strand &strand = _strands[id.getId()]; auto guard = std::unique_lock(_mutex); + assert(_self.state != Self::State::CLOSED); maybe_block_self(guard); - run_task_in_strand(std::move(task), strand, guard); + strand.queue.push(TaggedTask(std::move(task), _barrier.startEvent())); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + guard.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } } void AdaptiveSequencedExecutor::sync() { - vespalib::CountDownLatch latch(_strands.size()); + BarrierCompletion barrierCompletion; { - auto guard = std::unique_lock(_mutex); - for (Strand &strand: _strands) { - if (strand.state == Strand::State::IDLE) { - latch.countDown(); - } else { - Task::UP task = vespalib::makeLambdaTask([&](){ latch.countDown(); }); - run_task_in_strand(std::move(task), strand, guard); - } + auto guard = std::scoped_lock(_mutex); + if (!_barrier.startBarrier(barrierCompletion)) { + return; } } - latch.await(); + barrierCompletion.gate.await(); } void diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index 8f8b359797e..c52b9b22245 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -5,9 +5,12 @@ #include "isequencedtaskexecutor.h" #include #include +#include +#include #include #include #include +#include #include namespace vespalib { @@ -23,6 +26,24 @@ private: using Stats = vespalib::ExecutorStats; using Task = vespalib::Executor::Task; + struct TaggedTask { + Task::UP task; + uint32_t token; + TaggedTask() : task(nullptr), token(0) {} + TaggedTask(Task::UP task_in, uint32_t token_in) + : task(std::move(task_in)), token(token_in) {} + TaggedTask(TaggedTask &&rhs) = default; + TaggedTask(const TaggedTask &rhs) = delete; + TaggedTask &operator=(const TaggedTask &rhs) = delete; + TaggedTask &operator=(TaggedTask &&rhs) { + assert(task.get() == nullptr); // no overwrites + task = std::move(rhs.task); + token = rhs.token; + return *this; + } + operator bool() const { return bool(task); } + }; + /** * Values used to configure the executor. **/ @@ -51,7 +72,7 @@ private: struct Strand { enum class State { IDLE, WAITING, ACTIVE }; State state; - vespalib::ArrayQueue queue; + vespalib::ArrayQueue queue; Strand(); ~Strand(); }; @@ -96,11 +117,17 @@ private: void close(); }; + struct BarrierCompletion { + Gate gate; + void completeBarrier() { gate.countDown(); } + }; + std::unique_ptr _thread_tools; std::mutex _mutex; std::vector _strands; vespalib::ArrayQueue _wait_queue; vespalib::ArrayQueue _worker_stack; + EventBarrier _barrier; Self _self; Stats _stats; Config _cfg; @@ -111,9 +138,8 @@ private: Worker *get_worker_to_wake(const std::unique_lock &lock); bool obtain_strand(Worker &worker, std::unique_lock &lock); bool exchange_strand(Worker &worker, std::unique_lock &lock); - Task::UP next_task(Worker &worker); + TaggedTask next_task(Worker &worker, std::optional prev_token); void worker_main(); - void run_task_in_strand(Task::UP task, Strand &strand, std::unique_lock &lock); public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); -- cgit v1.2.3 From 4e35fa34cfba829364c069f1be66eb15a615cbcc Mon Sep 17 00:00:00 2001 From: Håvard Pettersen Date: Thu, 2 Jul 2020 13:02:17 +0000 Subject: use appropriate lock --- .../src/vespa/vespalib/util/adaptive_sequenced_executor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 8139fd6d11b..407129199e3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -304,7 +304,7 @@ AdaptiveSequencedExecutor::sync() { BarrierCompletion barrierCompletion; { - auto guard = std::scoped_lock(_mutex); + auto guard = std::lock_guard(_mutex); if (!_barrier.startBarrier(barrierCompletion)) { return; } -- cgit v1.2.3