diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-13 14:17:00 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-01-13 14:17:00 +0000 |
commit | d022bfb22dede8dd5c9c52a837685fa07a5058cf (patch) | |
tree | f477770a0c8c0a4528e44ba5eba35c1529aa4a73 /staging_vespalib | |
parent | aaddbc39b4c7024b3c12f2eb5ac627d482f6af42 (diff) |
Differentiate between numTasks called when holding lock and not.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 24 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 9 |
2 files changed, 22 insertions, 11 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 4acfdb376c6..0258f7a2e5c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -78,6 +78,16 @@ SingleExecutor::execute(Task::UP task) { } uint64_t +SingleExecutor::numTasks() { + if (_overflow) { + Lock guard(_mutex); + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); + } else { + return num_tasks_in_main_q(); + } +} + +uint64_t SingleExecutor::move_to_main_q(Lock &, Task::UP task) { uint64_t wp = _wp.load(std::memory_order_relaxed); _tasks[index(wp)] = std::move(task); @@ -93,7 +103,7 @@ SingleExecutor::setTaskLimit(uint32_t taskLimit) { void SingleExecutor::drain(Lock & lock) { uint64_t wp = _wp.load(std::memory_order_relaxed); - while (numTasks() > 0) { + while (numTasks(lock) > 0) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); } @@ -109,7 +119,7 @@ SingleExecutor::wakeup() { SingleExecutor & SingleExecutor::sync() { Lock lock(_mutex); - uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(); + uint64_t wp = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); while (wp > _rp.load(std::memory_order_acquire)) { _consumerCondition.notify_one(); sleepProducer(lock, 100us, wp); @@ -131,7 +141,7 @@ SingleExecutor::run() { _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); Lock lock(_mutex); - if (numTasks() <= 0) { + if (numTasks(lock) <= 0) { steady_time now = steady_clock::now(); _threadIdleTracker.set_idle(now); _consumerCondition.wait_until(lock, now + _reactionTime); @@ -159,7 +169,7 @@ SingleExecutor::move_overflow_to_main_q() } void SingleExecutor::move_overflow_to_main_q(Lock & guard) { - while ( !_overflow->empty() && num_task_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { + while ( !_overflow->empty() && num_tasks_in_main_q() < _taskLimit.load(std::memory_order_relaxed)) { move_to_main_q(guard, std::move(_overflow->front())); _overflow->pop(); } @@ -189,13 +199,13 @@ SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) _taskLimit = _wantedTaskLimit.load(); _watermark = _taskLimit * _watermarkRatio; } - uint64_t numTaskInQ = numTasks(); + uint64_t numTaskInQ = numTasks(guard); _queueSize.add(numTaskInQ); if (numTaskInQ >= _taskLimit.load(std::memory_order_relaxed)) { if (_overflow) { _overflow->push(std::move(task)); } else { - while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { + while (numTasks(guard) >= _taskLimit.load(std::memory_order_relaxed)) { sleepProducer(guard, _reactionTime, wp - get_watermark()); } } @@ -214,7 +224,7 @@ SingleExecutor::wait_for_room_or_put_in_overflow_Q(Lock & guard, Task::UP task) ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); - uint64_t accepted = _wp.load(std::memory_order_relaxed) + (_overflow ? _overflow->size() : 0); + uint64_t accepted = _wp.load(std::memory_order_relaxed) + num_tasks_in_overflow_q(lock); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 9ec107639bc..4fdc217e701 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -48,13 +48,14 @@ private: return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks() const { - return num_task_in_main_q() + num_tasks_in_overflow_q(); + uint64_t numTasks(); + uint64_t numTasks(Lock & guard) const { + return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); } - uint64_t num_tasks_in_overflow_q() const { + uint64_t num_tasks_in_overflow_q(Lock &) const { return _overflow ? _overflow->size() : 0; } - uint64_t num_task_in_main_q() const { + uint64_t num_tasks_in_main_q() const { return _wp.load(std::memory_order_relaxed) - _rp.load(std::memory_order_acquire); } const double _watermarkRatio; |