summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-22 16:30:17 +0200
committerGitHub <noreply@github.com>2021-10-22 16:30:17 +0200
commit5d17da853b36dd4d2798195de709438725e6351c (patch)
tree9cb6e4eb790cab9a7dbbff82af52a3f8b8c40fb4
parent1eef1ea9cae67ac87725eeee379c3435ff45f858 (diff)
parente0e101daff1db918463976907a47ccbe57cc3d50 (diff)
Merge pull request #19692 from vespa-engine/balder/count-working-days-2
Add a metric for how many times a worker in a thread pool has woken up.
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h8
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp10
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp8
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp16
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h6
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h35
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp32
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h21
14 files changed, 140 insertions, 44 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
index 74e0971178c..3c98857242f 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
@@ -10,6 +10,8 @@ ExecutorMetrics::update(const vespalib::ExecutorStats &stats)
maxPending.set(stats.queueSize.max());
accepted.inc(stats.acceptedTasks);
rejected.inc(stats.rejectedTasks);
+ wakeupCount.inc(stats.wakeupCount);
+ util.set(stats.getUtil());
const auto & qSize = stats.queueSize;
queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max());
}
@@ -19,6 +21,8 @@ ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *pa
maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this),
accepted("accepted", {}, "Number of accepted tasks", this),
rejected("rejected", {}, "Number of rejected tasks", this),
+ wakeupCount("wakeupCount", {}, "Number of times a worker thread has been woken up", this),
+ util("utilization", {}, "Ratio of time the worker threads has been active", this),
queueSize("queuesize", {}, "Size of task queue", this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
index 273c4ed8979..31d959a399f 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
@@ -11,9 +11,11 @@ namespace proton {
struct ExecutorMetrics : metrics::MetricSet
{
- metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
- metrics::LongCountMetric accepted;
- metrics::LongCountMetric rejected;
+ metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
+ metrics::LongCountMetric accepted;
+ metrics::LongCountMetric rejected;
+ metrics::LongCountMetric wakeupCount;
+ metrics::DoubleValueMetric util;
metrics::LongAverageMetric queueSize;
void update(const vespalib::ExecutorStats &stats);
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 869ff0456e1..099a958a00b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand()
AdaptiveSequencedExecutor::Worker::Worker()
: cond(),
+ idleTracker(),
state(State::RUNNING),
strand(nullptr)
{
@@ -151,9 +152,12 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m
} else {
worker.state = Worker::State::BLOCKED;
_worker_stack.push(&worker);
+ worker.idleTracker.set_idle(steady_clock::now());
while (worker.state == Worker::State::BLOCKED) {
worker.cond.wait(lock);
}
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
+ _stats.wakeupCount++;
}
return (worker.state == Worker::State::RUNNING);
}
@@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t
_worker_stack(num_threads),
_self(),
_stats(),
+ _idleTracker(steady_clock::now()),
_cfg(num_threads, max_waiting, max_pending)
{
_stats.queueSize.add(_self.pending_tasks);
@@ -329,6 +334,11 @@ AdaptiveSequencedExecutor::getStats()
{
auto guard = std::lock_guard(_mutex);
ExecutorStats stats = _stats;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _worker_stack.size(); i++) {
+ _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now));
+ }
+ stats.setUtil(_cfg.num_threads, _idleTracker.reset(now, _cfg.num_threads));
_stats = ExecutorStats();
_stats.queueSize.add(_self.pending_tasks);
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index fdcdf35fbbb..776248384a5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -3,6 +3,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/eventbarrier.hpp>
@@ -70,7 +71,7 @@ private:
struct Strand {
enum class State { IDLE, WAITING, ACTIVE };
State state;
- vespalib::ArrayQueue<TaggedTask> queue;
+ ArrayQueue<TaggedTask> queue;
Strand();
~Strand();
};
@@ -81,6 +82,7 @@ private:
struct Worker {
enum class State { RUNNING, BLOCKED, DONE };
std::condition_variable cond;
+ ThreadIdleTracker idleTracker;
State state;
Strand *strand;
Worker();
@@ -107,7 +109,7 @@ private:
static constexpr size_t STACK_SIZE = (256 * 1024);
AdaptiveSequencedExecutor &parent;
std::unique_ptr<FastOS_ThreadPool> pool;
- vespalib::Gate allow_worker_exit;
+ Gate allow_worker_exit;
ThreadTools(AdaptiveSequencedExecutor &parent_in);
~ThreadTools();
void Run(FastOS_ThreadInterface *, void *) override;
@@ -123,11 +125,12 @@ private:
std::unique_ptr<ThreadTools> _thread_tools;
mutable std::mutex _mutex;
std::vector<Strand> _strands;
- vespalib::ArrayQueue<Strand*> _wait_queue;
- vespalib::ArrayQueue<Worker*> _worker_stack;
+ ArrayQueue<Strand*> _wait_queue;
+ ArrayQueue<Worker*> _worker_stack;
EventBarrier<BarrierCompletion> _barrier;
Self _self;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Config _cfg;
void maybe_block_self(std::unique_lock<std::mutex> &lock);
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index 3bd5ca3d49a..c1e56572614 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -23,7 +23,7 @@ public:
}
size_t getNumThreads() const override { return 0; }
ExecutorStats getStats() override {
- return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 0);
}
void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
index 0d132193af1..703256c5521 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -19,7 +19,7 @@ ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads)
ForegroundTaskExecutor::~ForegroundTaskExecutor() = default;
void
-ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+ForegroundTaskExecutor::executeTask(ExecutorId id, Executor::Task::UP task)
{
assert(id.getId() < getNumExecutors());
task->run();
@@ -35,8 +35,8 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
}
-vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
+ExecutorStats ForegroundTaskExecutor::getStats() {
+ return ExecutorStats(ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 0);
}
ISequencedTaskExecutor::ExecutorId
@@ -44,4 +44,4 @@ ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const {
return ExecutorId(componentId%getNumExecutors());
}
-} // namespace search
+}
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
index 7359de0ba66..9d351aca653 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h
@@ -22,10 +22,10 @@ public:
~ForegroundTaskExecutor() override;
ExecutorId getExecutorId(uint64_t componentId) const override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void executeTask(ExecutorId id, Executor::Task::UP task) override;
void sync() override;
void setTaskLimit(uint32_t taskLimit) override;
- vespalib::ExecutorStats getStats() override;
+ ExecutorStats getStats() override;
private:
std::atomic<uint64_t> _accepted;
};
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 727894397a7..9e95bdaa3ab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
for (auto &executor :* _executors) {
- accumulatedStats += executor->getStats();
+ accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 803ec4f3f7c..af95918ccab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_consumerCondition(),
_producerCondition(),
_thread(*this),
+ _idleTracker(steady_clock::now()),
+ _threadIdleTracker(),
+ _wakeupCount(0),
_lastAccepted(0),
_queueSize(),
_wakeupConsumerAt(0),
@@ -115,7 +118,11 @@ SingleExecutor::run() {
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, _reactionTime);
+ steady_time now = steady_clock::now();
+ _threadIdleTracker.set_idle(now);
+ _consumerCondition.wait_until(lock, now + _reactionTime);
+ _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now()));
+ _wakeupCount++;
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) {
drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
- taskLimit = _taskLimit;
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
@@ -162,7 +168,11 @@ ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0);
+ steady_time now = steady_clock::now();
+ _idleTracker.was_idle(_threadIdleTracker.reset(now));
+ ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount);
+ stats.setUtil(1, _idleTracker.reset(now, 1));
+ _wakeupCount = 0;
_lastAccepted = accepted;
_queueSize = ExecutorStats::QueueSizeT() ;
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 8e9c1ae3fa1..7d868322558 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -18,7 +19,7 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit);
SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
@@ -54,6 +55,9 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ ExecutorIdleTracker _idleTracker;
+ ThreadIdleTracker _threadIdleTracker;
+ uint64_t _wakeupCount;
uint64_t _lastAccepted;
ExecutorStats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index b55f54f9339..e61dc071b62 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -4,7 +4,7 @@
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/vespalib/util/backtrace.h>
#include <vespa/vespalib/util/size_literals.h>
-#include <atomic>
+#include <thread>
using namespace vespalib;
@@ -74,9 +74,9 @@ struct MyState {
ExecutorStats stats = executor.getStats();
EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
- EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
- stats.acceptedTasks);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks);
EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks));
EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
if (expect_deleted == 0) {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
@@ -85,6 +85,7 @@ struct MyState {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
EXPECT_EQUAL(0u, stats.acceptedTasks);
EXPECT_EQUAL(0u, stats.rejectedTasks);
+ EXPECT_EQUAL(0u, stats.wakeupCount);
return *this;
}
};
@@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3);
+ EXPECT_TRUE(std::atomic<duration>::is_always_lock_free);
+ ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3,7);
+ stats.setUtil(3, 0.8);
EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9);
+ EXPECT_EQUAL(7u, stats.wakeupCount);
+ EXPECT_EQUAL(3u, stats.getThreadCount());
+ EXPECT_EQUAL(0.2, stats.getUtil());
+ stats.aggregate(ExecutorStats(ExecutorStats::QueueSizeT(7),8,9,11).setUtil(7,0.5));
EXPECT_EQUAL(2u, stats.queueSize.count());
EXPECT_EQUAL(8u, stats.queueSize.total());
EXPECT_EQUAL(8u, stats.queueSize.max());
@@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") {
EXPECT_EQUAL(8u, stats.queueSize.max());
EXPECT_EQUAL(4.0, stats.queueSize.average());
+ EXPECT_EQUAL(10u, stats.getThreadCount());
EXPECT_EQUAL(10u, stats.acceptedTasks);
EXPECT_EQUAL(12u, stats.rejectedTasks);
+ EXPECT_EQUAL(18u, stats.wakeupCount);
+ EXPECT_EQUAL(0.41, stats.getUtil());
+}
+TEST("Test that utilization is computed") {
+ ThreadStackExecutor executor(1, 128_Ki);
+ std::this_thread::sleep_for(1s);
+ auto stats = executor.getStats();
+ EXPECT_GREATER(0.50, stats.getUtil());
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index f1f58685570..577ae933ec2 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -51,25 +51,46 @@ private:
/**
* Struct representing stats for an executor.
+ * Note that aggregation requires sample interval to be the same(similar) for all samples.
**/
-struct ExecutorStats {
+class ExecutorStats {
+private:
+ size_t _threadCount;
+ double _absUtil;
+public:
using QueueSizeT = AggregatedAverage<size_t>;
QueueSizeT queueSize;
- size_t acceptedTasks;
- size_t rejectedTasks;
- ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {}
- ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected)
- : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected)
+ size_t acceptedTasks;
+ size_t rejectedTasks;
+ size_t wakeupCount; // Number of times a worker was woken up,
+
+ ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0, 0) {}
+ ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in)
+ : _threadCount(1),
+ _absUtil(1.0),
+ queueSize(queueSize_in),
+ acceptedTasks(accepted),
+ rejectedTasks(rejected),
+ wakeupCount(wakeupCount_in)
{}
- ExecutorStats & operator += (const ExecutorStats & rhs) {
+ void aggregate(const ExecutorStats & rhs) {
+ _threadCount += rhs._threadCount;
queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(),
queueSize.total() + rhs.queueSize.total(),
queueSize.min() + rhs.queueSize.min(),
queueSize.max() + rhs.queueSize.max());
acceptedTasks += rhs.acceptedTasks;
rejectedTasks += rhs.rejectedTasks;
+ wakeupCount += rhs.wakeupCount;
+ _absUtil += rhs._absUtil;
+ }
+ ExecutorStats & setUtil(uint32_t threadCount, double idle) {
+ _threadCount = threadCount;
+ _absUtil = (1.0 - idle) * threadCount;
return *this;
}
+ double getUtil() const { return _absUtil / _threadCount; }
+ size_t getThreadCount() const { return _threadCount; }
};
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index f80a5b4ce32..133350f3d56 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) {
}
+ThreadStackExecutorBase::Worker::Worker()
+ : lock(),
+ cond(),
+ idleTracker(),
+ pre_guard(0xaaaaaaaa),
+ idle(true),
+ post_guard(0x55555555),
+ task()
+{}
+
+void
+ThreadStackExecutorBase::Worker::verify(bool expect_idle) const {
+ (void) expect_idle;
+ assert(pre_guard == 0xaaaaaaaa);
+ assert(post_guard == 0x55555555);
+ assert(idle == expect_idle);
+ assert(!task.task == expect_idle);
+}
+
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
@@ -103,6 +122,7 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
return false;
}
_workers.push(&worker);
+ worker.idleTracker.set_idle(steady_clock::now());
}
{
unique_lock guard(worker.lock);
@@ -141,6 +161,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_lock(),
_cond(),
_stats(),
+ _idleTracker(steady_clock::now()),
_executorCompletion(),
_tasks(),
_workers(),
@@ -164,7 +185,8 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
-size_t ThreadStackExecutorBase::getNumThreads() const {
+size_t
+ThreadStackExecutorBase::getNumThreads() const {
return _pool->GetNumStartedThreads();
}
@@ -208,6 +230,12 @@ ThreadStackExecutorBase::getStats()
{
std::unique_lock guard(_lock);
ExecutorStats stats = _stats;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _workers.size(); i++) {
+ _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now));
+ }
+ size_t numThreads = getNumThreads();
+ stats.setUtil(numThreads, _idleTracker.reset(now, numThreads));
_stats = ExecutorStats();
_stats.queueSize.add(_taskCount);
return stats;
@@ -225,6 +253,8 @@ ThreadStackExecutorBase::execute(Task::UP task)
if (!_workers.empty()) {
Worker *worker = _workers.back();
_workers.popBack();
+ _idleTracker.was_idle(worker->idleTracker.set_active(steady_clock::now()));
+ _stats.wakeupCount++;
guard.unlock(); // <- UNLOCK
assignTask(std::move(taggedTask), *worker);
} else {
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 66a34bfde95..c3552cfe579 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -7,6 +7,7 @@
#include "arrayqueue.hpp"
#include "gate.h"
#include "runnable.h"
+#include "executor_idle_tracking.h"
#include <vector>
#include <functional>
@@ -47,18 +48,13 @@ private:
struct Worker {
std::mutex lock;
std::condition_variable cond;
- uint32_t pre_guard;
- bool idle;
- uint32_t post_guard;
- TaggedTask task;
- Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
- void verify(bool expect_idle) const {
- (void) expect_idle;
- assert(pre_guard == 0xaaaaaaaa);
- assert(post_guard == 0x55555555);
- assert(idle == expect_idle);
- assert(!task.task == expect_idle);
- }
+ ThreadIdleTracker idleTracker;
+ uint32_t pre_guard;
+ bool idle;
+ uint32_t post_guard;
+ TaggedTask task;
+ Worker();
+ void verify(bool expect_idle) const;
};
struct BarrierCompletion {
@@ -81,6 +77,7 @@ private:
mutable std::mutex _lock;
std::condition_variable _cond;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;