diff options
author | Håvard Pettersen <havardpe@oath.com> | 2022-03-15 13:03:21 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2022-03-15 13:03:21 +0000 |
commit | f064bc8e138bc9017c19e291d7aa4e3f5b7a32de (patch) | |
tree | 8ff655e46b1f10bbf123b6f464f1e093abd72ff4 /staging_vespalib | |
parent | d4fe7e63855a8fca22a1509f1752ec0611fd71ec (diff) |
hold lock while signalling
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp | 31 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h | 4 |
2 files changed, 10 insertions, 25 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 1e23ba15785..6db97ff0761 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -103,18 +103,17 @@ AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock) } } -bool +void AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &) { if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) { _self.state = Self::State::OPEN; - return true; + _self.cond.notify_all(); } - return false; } -AdaptiveSequencedExecutor::Worker * -AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &) +void +AdaptiveSequencedExecutor::maybe_wake_worker(const std::unique_lock<std::mutex> &) { if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) { assert(!_wait_queue.empty()); @@ -130,9 +129,8 @@ AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> worker->strand->state = Strand::State::ACTIVE; assert(_self.waiting_tasks >= worker->strand->queue.size()); _self.waiting_tasks -= worker->strand->queue.size(); - return worker; + worker->cond.notify_one(); } - return nullptr; } bool @@ -187,7 +185,6 @@ AdaptiveSequencedExecutor::TaggedTask AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> prev_token) { TaggedTask task; - Worker *worker_to_wake = nullptr; auto guard = std::unique_lock(_mutex); if (prev_token.has_value()) { _barrier.completeEvent(prev_token.value()); @@ -199,19 +196,12 @@ AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> pre task = std::move(worker.strand->queue.front()); worker.strand->queue.pop(); _stats.queueSize.add(--_self.pending_tasks); - worker_to_wake = get_worker_to_wake(guard); + maybe_wake_worker(guard); } else { assert(worker.state == Worker::State::DONE); assert(worker.strand == nullptr); } - bool signal_self = maybe_unblock_self(guard); - guard.unlock(); // UNLOCK - if (worker_to_wake != nullptr) { - worker_to_wake->cond.notify_one(); - } - if (signal_self) { - _self.cond.notify_all(); - } + maybe_unblock_self(guard); return task; } @@ -299,7 +289,6 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) assert(worker->strand == nullptr); worker->state = Worker::State::RUNNING; worker->strand = &strand; - guard.unlock(); // UNLOCK worker->cond.notify_one(); } } @@ -323,11 +312,7 @@ AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit) { auto guard = std::unique_lock(_mutex); _cfg.set_max_pending(task_limit); - bool signal_self = maybe_unblock_self(guard); - guard.unlock(); // UNLOCK - if (signal_self) { - _self.cond.notify_all(); - } + maybe_unblock_self(guard); } ExecutorStats diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index d6244564fbd..fbebf8b4e4c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -142,9 +142,9 @@ private: Config _cfg; void maybe_block_self(std::unique_lock<std::mutex> &lock); - bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock); + void maybe_unblock_self(const std::unique_lock<std::mutex> &lock); - Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock); + void maybe_wake_worker(const std::unique_lock<std::mutex> &lock); bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock); bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); TaggedTask next_task(Worker &worker, std::optional<uint32_t> prev_token); |