summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-13 14:17:00 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-01-13 14:17:00 +0000
commitd022bfb22dede8dd5c9c52a837685fa07a5058cf (patch)
treef477770a0c8c0a4528e44ba5eba35c1529aa4a73
parentaaddbc39b4c7024b3c12f2eb5ac627d482f6af42 (diff)
Differentiate between numTasks called when holding lock and not.
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp24
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h9
2 files changed, 22 insertions, 11 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 4acfdb376c6..0258f7a2e5c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -78,6 +78,16 @@ SingleExecutor::execute(Task::UP task) {
}
uint64_t
+SingleExecutor::numTasks() {
+ if (_overflow) {
+ Lock guard(_mutex);
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
+ } else {
+ return num_tasks_in_main_q();
+ }
+}
+
+uint64_t
SingleExecutor::move_to_main_q(Lock &, Task::UP task) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
_tasks[index(wp)] = std::move(task);
@@ -93,7 +103,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) {
void
SingleExecutor::drain(Lock & lock) {
uint64_t wp = _wp.load(std::memory_order_relaxed);
- while (numTasks() > 0) {
+ while (numTasks(lock) > 0) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
}
@@ -109,7 +119,7 @@ SingleExecutor::wakeup() {
SingleExecutor &
SingleExecutor::sync() {
Lock lock(_mutex);
- uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q();
+ uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
while (wp > _rp.load(std::memory_order_acquire)) {
_consumerCondition.notify_one();
sleepProducer(lock, 100us, wp);
@@ -131,7 +141,7 @@ SingleExecutor::run() {
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
Lock lock(_mutex);
- if (numTasks() <= 0) {
+ if (numTasks(lock) <= 0) {
steady_time now = steady_clock::now();
_threadIdleTracker.set_idle(now);
_consumerCondition.wait_until(lock, now + _reactionTime);
@@ -159,7 +169,7 @@ SingleExecutor::move_overflow_to_main_q()
}
void
SingleExecutor::move_overflow_to_main_q(Lock & guard) {
- while ( !_overflow->empty() && num_task_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) {
+ while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) {
move_to_main_q(guard, std::move(_overflow->front()));
_overflow->pop();
}
@@ -189,13 +199,13 @@ SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task)
_taskLimit = _wantedTaskLimit.load();
_watermark = _taskLimit * _watermarkRatio;
}
- uint64_t numTaskInQ = numTasks();
+ uint64_t numTaskInQ = numTasks(guard);
_queueSize.add(numTaskInQ);
if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) {
if (_overflow) {
_overflow->push(std::move(task));
} else {
- while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
+ while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) {
sleepProducer(guard, _reactionTime, wp - get_watermark());
}
}
@@ -214,7 +224,7 @@ SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task)
ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
- uint64_t accepted = _wp.load(std::memory_order_relaxed) + (_overflow ? _overflow->size() : 0);
+ uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock);
steady_time now = steady_clock::now();
_idleTracker.was_idle(_threadIdleTracker.reset(now));
ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 9ec107639bc..4fdc217e701 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -48,13 +48,14 @@ private:
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
- uint64_t numTasks() const {
- return num_task_in_main_q() + num_tasks_in_overflow_q();
+ uint64_t numTasks();
+ uint64_t numTasks(Lock & guard) const {
+ return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
}
- uint64_t num_tasks_in_overflow_q() const {
+ uint64_t num_tasks_in_overflow_q(Lock &) const {
return _overflow ? _overflow->size() : 0;
}
- uint64_t num_task_in_main_q() const {
+ uint64_t num_tasks_in_main_q() const {
return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire);
}
const double _watermarkRatio;