summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2022-03-15 13:03:21 +0000
committerHåvard Pettersen <havardpe@oath.com>2022-03-15 13:03:21 +0000
commitf064bc8e138bc9017c19e291d7aa4e3f5b7a32de (patch)
tree8ff655e46b1f10bbf123b6f464f1e093abd72ff4 /staging_vespalib
parentd4fe7e63855a8fca22a1509f1752ec0611fd71ec (diff)
hold lock while signalling
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp31
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h4
2 files changed, 10 insertions, 25 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 1e23ba15785..6db97ff0761 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -103,18 +103,17 @@ AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock)
}
}
-bool
+void
AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &)
{
if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) {
_self.state = Self::State::OPEN;
- return true;
+ _self.cond.notify_all();
}
- return false;
}
-AdaptiveSequencedExecutor::Worker *
-AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &)
+void
+AdaptiveSequencedExecutor::maybe_wake_worker(const std::unique_lock<std::mutex> &)
{
if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) {
assert(!_wait_queue.empty());
@@ -130,9 +129,8 @@ AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex>
worker->strand->state = Strand::State::ACTIVE;
assert(_self.waiting_tasks >= worker->strand->queue.size());
_self.waiting_tasks -= worker->strand->queue.size();
- return worker;
+ worker->cond.notify_one();
}
- return nullptr;
}
bool
@@ -187,7 +185,6 @@ AdaptiveSequencedExecutor::TaggedTask
AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> prev_token)
{
TaggedTask task;
- Worker *worker_to_wake = nullptr;
auto guard = std::unique_lock(_mutex);
if (prev_token.has_value()) {
_barrier.completeEvent(prev_token.value());
@@ -199,19 +196,12 @@ AdaptiveSequencedExecutor::next_task(Worker &worker, std::optional<uint32_t> pre
task = std::move(worker.strand->queue.front());
worker.strand->queue.pop();
_stats.queueSize.add(--_self.pending_tasks);
- worker_to_wake = get_worker_to_wake(guard);
+ maybe_wake_worker(guard);
} else {
assert(worker.state == Worker::State::DONE);
assert(worker.strand == nullptr);
}
- bool signal_self = maybe_unblock_self(guard);
- guard.unlock(); // UNLOCK
- if (worker_to_wake != nullptr) {
- worker_to_wake->cond.notify_one();
- }
- if (signal_self) {
- _self.cond.notify_all();
- }
+ maybe_unblock_self(guard);
return task;
}
@@ -299,7 +289,6 @@ AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task)
assert(worker->strand == nullptr);
worker->state = Worker::State::RUNNING;
worker->strand = &strand;
- guard.unlock(); // UNLOCK
worker->cond.notify_one();
}
}
@@ -323,11 +312,7 @@ AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit)
{
auto guard = std::unique_lock(_mutex);
_cfg.set_max_pending(task_limit);
- bool signal_self = maybe_unblock_self(guard);
- guard.unlock(); // UNLOCK
- if (signal_self) {
- _self.cond.notify_all();
- }
+ maybe_unblock_self(guard);
}
ExecutorStats
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index d6244564fbd..fbebf8b4e4c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -142,9 +142,9 @@ private:
Config _cfg;
void maybe_block_self(std::unique_lock<std::mutex> &lock);
- bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock);
+ void maybe_unblock_self(const std::unique_lock<std::mutex> &lock);
- Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock);
+ void maybe_wake_worker(const std::unique_lock<std::mutex> &lock);
bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock);
TaggedTask next_task(Worker &worker, std::optional<uint32_t> prev_token);