summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-19 05:06:06 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-22 07:31:50 +0000
commit86e9a67362a9e8de7a4a267f6450b41cc60be290 (patch)
tree6f0e34f9ed012bdf6fa2eeecdb97fd89ee54e092
parent7ec9aa11869ffdba64a0009a7396a4483a045cf5 (diff)
Add a metric for how many times a worker in a thread pool has woken up.
Also track the idle time a worker has and add metric for the utilization.
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h8
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp15
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h6
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp23
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h26
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp37
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h21
13 files changed, 130 insertions, 38 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
index 74e0971178c..789e97138e0 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
@@ -10,6 +10,8 @@ ExecutorMetrics::update(const vespalib::ExecutorStats &stats)
maxPending.set(stats.queueSize.max());
accepted.inc(stats.acceptedTasks);
rejected.inc(stats.rejectedTasks);
+ wakeupCount.inc(stats.wakeupCount);
+ util.set(stats.getUtil());
const auto & qSize = stats.queueSize;
queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max());
}
@@ -19,6 +21,8 @@ ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *pa
maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this),
accepted("accepted", {}, "Number of accepted tasks", this),
rejected("rejected", {}, "Number of rejected tasks", this),
+ wakeupCount("wakeupCount", {}, "Number of times a worker thread has been woken up", this),
+ util("utilization", {}, "ratio of time the worker threads has been active", this),
queueSize("queuesize", {}, "Size of task queue", this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
index 273c4ed8979..31d959a399f 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
@@ -11,9 +11,11 @@ namespace proton {
struct ExecutorMetrics : metrics::MetricSet
{
- metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
- metrics::LongCountMetric accepted;
- metrics::LongCountMetric rejected;
+ metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
+ metrics::LongCountMetric accepted;
+ metrics::LongCountMetric rejected;
+ metrics::LongCountMetric wakeupCount;
+ metrics::DoubleValueMetric util;
metrics::LongAverageMetric queueSize;
void update(const vespalib::ExecutorStats &stats);
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 869ff0456e1..7681af43afc 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand()
AdaptiveSequencedExecutor::Worker::Worker()
: cond(),
+ idleTracker(),
state(State::RUNNING),
strand(nullptr)
{
@@ -152,8 +153,11 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m
worker.state = Worker::State::BLOCKED;
_worker_stack.push(&worker);
while (worker.state == Worker::State::BLOCKED) {
+ worker.idleTracker.set_idle(steady_clock::now());
worker.cond.wait(lock);
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
}
+ _stats.wakeupCount++;
}
return (worker.state == Worker::State::RUNNING);
}
@@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t
_worker_stack(num_threads),
_self(),
_stats(),
+ _idleTracker(steady_clock::now()),
_cfg(num_threads, max_waiting, max_pending)
{
_stats.queueSize.add(_self.pending_tasks);
@@ -329,6 +334,12 @@ AdaptiveSequencedExecutor::getStats()
{
auto guard = std::lock_guard(_mutex);
ExecutorStats stats = _stats;
+ stats.executorCount = _cfg.num_threads;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _worker_stack.size(); i++) {
+ _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now));
+ }
+ _stats.absUtil = _idleTracker.reset(now, 1);
_stats = ExecutorStats();
_stats.queueSize.add(_self.pending_tasks);
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index fdcdf35fbbb..776248384a5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -3,6 +3,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/eventbarrier.hpp>
@@ -70,7 +71,7 @@ private:
struct Strand {
enum class State { IDLE, WAITING, ACTIVE };
State state;
- vespalib::ArrayQueue<TaggedTask> queue;
+ ArrayQueue<TaggedTask> queue;
Strand();
~Strand();
};
@@ -81,6 +82,7 @@ private:
struct Worker {
enum class State { RUNNING, BLOCKED, DONE };
std::condition_variable cond;
+ ThreadIdleTracker idleTracker;
State state;
Strand *strand;
Worker();
@@ -107,7 +109,7 @@ private:
static constexpr size_t STACK_SIZE = (256 * 1024);
AdaptiveSequencedExecutor &parent;
std::unique_ptr<FastOS_ThreadPool> pool;
- vespalib::Gate allow_worker_exit;
+ Gate allow_worker_exit;
ThreadTools(AdaptiveSequencedExecutor &parent_in);
~ThreadTools();
void Run(FastOS_ThreadInterface *, void *) override;
@@ -123,11 +125,12 @@ private:
std::unique_ptr<ThreadTools> _thread_tools;
mutable std::mutex _mutex;
std::vector<Strand> _strands;
- vespalib::ArrayQueue<Strand*> _wait_queue;
- vespalib::ArrayQueue<Worker*> _worker_stack;
+ ArrayQueue<Strand*> _wait_queue;
+ ArrayQueue<Worker*> _worker_stack;
EventBarrier<BarrierCompletion> _barrier;
Self _self;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Config _cfg;
void maybe_block_self(std::unique_lock<std::mutex> &lock);
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index 3bd5ca3d49a..15da8b90e3c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -23,7 +23,7 @@ public:
}
size_t getNumThreads() const override { return 0; }
ExecutorStats getStats() override {
- return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ return ExecutorStats(1, ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1, 1.0);
}
void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
index 0d132193af1..b65f79e65cb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -36,7 +36,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
}
vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
+ return vespalib::ExecutorStats(1, vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1, 1.0);
}
ISequencedTaskExecutor::ExecutorId
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 727894397a7..9e95bdaa3ab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
for (auto &executor :* _executors) {
- accumulatedStats += executor->getStats();
+ accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 803ec4f3f7c..2bb87350036 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_consumerCondition(),
_producerCondition(),
_thread(*this),
+ _idleTracker(steady_clock::now()),
+ _threadIdleTracker(),
+ _wakeupCount(0),
_lastAccepted(0),
_queueSize(),
_wakeupConsumerAt(0),
@@ -115,7 +118,11 @@ SingleExecutor::run() {
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, _reactionTime);
+ steady_time now = steady_clock::now();
+ _threadIdleTracker.set_idle(now);
+ _consumerCondition.wait_until(lock, now + _reactionTime);
+ _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now()));
+ _wakeupCount++;
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) {
drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
- taskLimit = _taskLimit;
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
@@ -162,7 +168,10 @@ ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0);
+ steady_time now = steady_clock::now();
+ _idleTracker.was_idle(_threadIdleTracker.reset(now));
+ ExecutorStats stats(1, _queueSize, (accepted - _lastAccepted), 0, _wakeupCount, _idleTracker.reset(now, 1));
+ _wakeupCount = 0;
_lastAccepted = accepted;
_queueSize = ExecutorStats::QueueSizeT() ;
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 8e9c1ae3fa1..7d868322558 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -18,7 +19,7 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit);
SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
@@ -54,6 +55,9 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ ExecutorIdleTracker _idleTracker;
+ ThreadIdleTracker _threadIdleTracker;
+ uint64_t _wakeupCount;
uint64_t _lastAccepted;
ExecutorStats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index b55f54f9339..56dbf0d19b5 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -74,9 +74,9 @@ struct MyState {
ExecutorStats stats = executor.getStats();
EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
- EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
- stats.acceptedTasks);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks);
EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks));
EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
if (expect_deleted == 0) {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
@@ -85,6 +85,7 @@ struct MyState {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
EXPECT_EQUAL(0u, stats.acceptedTasks);
EXPECT_EQUAL(0u, stats.rejectedTasks);
+ EXPECT_EQUAL(0u, stats.wakeupCount);
return *this;
}
};
@@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3);
+ EXPECT_TRUE(std::atomic<duration>::is_always_lock_free);
+ ExecutorStats stats(3, ExecutorStats::QueueSizeT(1) ,2,3,7, 0.6);
EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9);
+ EXPECT_EQUAL(7u, stats.wakeupCount);
+ EXPECT_EQUAL(3u, stats.executorCount);
+ EXPECT_EQUAL(0.6, stats.absUtil);
+ EXPECT_EQUAL(0.2, stats.getUtil());
+ stats.aggregate(ExecutorStats(7, ExecutorStats::QueueSizeT(7),8,9,11, 1.9));
EXPECT_EQUAL(2u, stats.queueSize.count());
EXPECT_EQUAL(8u, stats.queueSize.total());
EXPECT_EQUAL(8u, stats.queueSize.max());
@@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") {
EXPECT_EQUAL(8u, stats.queueSize.max());
EXPECT_EQUAL(4.0, stats.queueSize.average());
+ EXPECT_EQUAL(10u, stats.executorCount);
EXPECT_EQUAL(10u, stats.acceptedTasks);
EXPECT_EQUAL(12u, stats.rejectedTasks);
+ EXPECT_EQUAL(18u, stats.wakeupCount);
+ EXPECT_EQUAL(2.5, stats.absUtil);
+ EXPECT_EQUAL(0.25, stats.getUtil());
+}
+TEST("Test that load is computed") {
+ ThreadStackExecutor executor(1, 128_Ki);
+ auto stats = executor.getStats();
+ EXPECT_EQUAL(0.0, stats.absUtil);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index f1f58685570..3ebf1d8744d 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -51,25 +51,37 @@ private:
/**
* Struct representing stats for an executor.
+ * Note that aggregation requires sample interval to be the same(similar) for all samples.
**/
struct ExecutorStats {
using QueueSizeT = AggregatedAverage<size_t>;
+ uint32_t executorCount;
QueueSizeT queueSize;
- size_t acceptedTasks;
- size_t rejectedTasks;
- ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {}
- ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected)
- : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected)
+ size_t acceptedTasks;
+ size_t rejectedTasks;
+ size_t wakeupCount; // Number of times a worker was woken up,
+ double absUtil;
+ ExecutorStats() : ExecutorStats(1, QueueSizeT(), 0, 0, 0, 1.0) {}
+ ExecutorStats(uint32_t executorCount_in, QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in, double absUtil_in)
+ : executorCount(executorCount_in),
+ queueSize(queueSize_in),
+ acceptedTasks(accepted),
+ rejectedTasks(rejected),
+ wakeupCount(wakeupCount_in),
+ absUtil(absUtil_in)
{}
- ExecutorStats & operator += (const ExecutorStats & rhs) {
+ void aggregate(const ExecutorStats & rhs) {
+ executorCount += rhs.executorCount;
queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(),
queueSize.total() + rhs.queueSize.total(),
queueSize.min() + rhs.queueSize.min(),
queueSize.max() + rhs.queueSize.max());
acceptedTasks += rhs.acceptedTasks;
rejectedTasks += rhs.rejectedTasks;
- return *this;
+ wakeupCount += rhs.wakeupCount;
+ absUtil += rhs.absUtil;
}
+ double getUtil() const { return absUtil / executorCount; }
};
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index f80a5b4ce32..e5da270e099 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) {
}
+ThreadStackExecutorBase::Worker::Worker()
+ : lock(),
+ cond(),
+ idleTracker(),
+ pre_guard(0xaaaaaaaa),
+ idle(true),
+ post_guard(0x55555555),
+ task()
+{}
+
+void
+ThreadStackExecutorBase::Worker::verify(bool expect_idle) const {
+ (void) expect_idle;
+ assert(pre_guard == 0xaaaaaaaa);
+ assert(post_guard == 0x55555555);
+ assert(idle == expect_idle);
+ assert(!task.task == expect_idle);
+}
+
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
@@ -103,11 +122,17 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
return false;
}
_workers.push(&worker);
+ //TODO Not entirely correct as this counts working days, not wakeUps.
+ //But it should be the same, and here it is thread safe.
+ _stats.wakeupCount++;
}
{
unique_lock guard(worker.lock);
while (worker.idle) {
+ worker.idleTracker.set_idle(steady_clock::now());
worker.cond.wait(guard);
+ //TODO: _idleTracker.was_idle is not thread safe !!!! Must find other solution. Atomic ?
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
}
}
worker.idle = !worker.task.task;
@@ -141,6 +166,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_lock(),
_cond(),
_stats(),
+ _idleTracker(steady_clock::now()),
_executorCompletion(),
_tasks(),
_workers(),
@@ -157,6 +183,8 @@ void
ThreadStackExecutorBase::start(uint32_t threads)
{
assert(threads > 0);
+ _stats.executorCount = threads;
+ _idleTracker.reset(steady_clock::now(), threads);
for (uint32_t i = 0; i < threads; ++i) {
FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get());
assert(thread != nullptr);
@@ -164,7 +192,8 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
-size_t ThreadStackExecutorBase::getNumThreads() const {
+size_t
+ThreadStackExecutorBase::getNumThreads() const {
return _pool->GetNumStartedThreads();
}
@@ -208,6 +237,12 @@ ThreadStackExecutorBase::getStats()
{
std::unique_lock guard(_lock);
ExecutorStats stats = _stats;
+ stats.executorCount = getNumThreads();
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _workers.size(); i++) {
+ _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now));
+ }
+ stats.absUtil = _idleTracker.reset(now, 1);
_stats = ExecutorStats();
_stats.queueSize.add(_taskCount);
return stats;
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 66a34bfde95..c3552cfe579 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -7,6 +7,7 @@
#include "arrayqueue.hpp"
#include "gate.h"
#include "runnable.h"
+#include "executor_idle_tracking.h"
#include <vector>
#include <functional>
@@ -47,18 +48,13 @@ private:
struct Worker {
std::mutex lock;
std::condition_variable cond;
- uint32_t pre_guard;
- bool idle;
- uint32_t post_guard;
- TaggedTask task;
- Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
- void verify(bool expect_idle) const {
- (void) expect_idle;
- assert(pre_guard == 0xaaaaaaaa);
- assert(post_guard == 0x55555555);
- assert(idle == expect_idle);
- assert(!task.task == expect_idle);
- }
+ ThreadIdleTracker idleTracker;
+ uint32_t pre_guard;
+ bool idle;
+ uint32_t post_guard;
+ TaggedTask task;
+ Worker();
+ void verify(bool expect_idle) const;
};
struct BarrierCompletion {
@@ -81,6 +77,7 @@ private:
mutable std::mutex _lock;
std::condition_variable _cond;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;