summaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-19 05:06:06 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-22 07:31:50 +0000
commit86e9a67362a9e8de7a4a267f6450b41cc60be290 (patch)
tree6f0e34f9ed012bdf6fa2eeecdb97fd89ee54e092 /vespalib
parent7ec9aa11869ffdba64a0009a7396a4483a045cf5 (diff)
Add a metric for how many times a worker in a thread pool has woken up.
Also track the idle time a worker has and add metric for the utilization.
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp23
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h26
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp37
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h21
4 files changed, 83 insertions, 24 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index b55f54f9339..56dbf0d19b5 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -74,9 +74,9 @@ struct MyState {
ExecutorStats stats = executor.getStats();
EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt);
EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt);
- EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,
- stats.acceptedTasks);
+ EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks);
EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
+ EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks));
EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
if (expect_deleted == 0) {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
@@ -85,6 +85,7 @@ struct MyState {
EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
EXPECT_EQUAL(0u, stats.acceptedTasks);
EXPECT_EQUAL(0u, stats.rejectedTasks);
+ EXPECT_EQUAL(0u, stats.wakeupCount);
return *this;
}
};
@@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3);
+ EXPECT_TRUE(std::atomic<duration>::is_always_lock_free);
+ ExecutorStats stats(3, ExecutorStats::QueueSizeT(1) ,2,3,7, 0.6);
EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9);
+ EXPECT_EQUAL(7u, stats.wakeupCount);
+ EXPECT_EQUAL(3u, stats.executorCount);
+ EXPECT_EQUAL(0.6, stats.absUtil);
+ EXPECT_EQUAL(0.2, stats.getUtil());
+ stats.aggregate(ExecutorStats(7, ExecutorStats::QueueSizeT(7),8,9,11, 1.9));
EXPECT_EQUAL(2u, stats.queueSize.count());
EXPECT_EQUAL(8u, stats.queueSize.total());
EXPECT_EQUAL(8u, stats.queueSize.max());
@@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") {
EXPECT_EQUAL(8u, stats.queueSize.max());
EXPECT_EQUAL(4.0, stats.queueSize.average());
+ EXPECT_EQUAL(10u, stats.executorCount);
EXPECT_EQUAL(10u, stats.acceptedTasks);
EXPECT_EQUAL(12u, stats.rejectedTasks);
+ EXPECT_EQUAL(18u, stats.wakeupCount);
+ EXPECT_EQUAL(2.5, stats.absUtil);
+ EXPECT_EQUAL(0.25, stats.getUtil());
+}
+TEST("Test that load is computed") {
+ ThreadStackExecutor executor(1, 128_Ki);
+ auto stats = executor.getStats();
+ EXPECT_EQUAL(0.0, stats.absUtil);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index f1f58685570..3ebf1d8744d 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -51,25 +51,37 @@ private:
/**
* Struct representing stats for an executor.
+ * Note that aggregation requires sample interval to be the same(similar) for all samples.
**/
struct ExecutorStats {
using QueueSizeT = AggregatedAverage<size_t>;
+ uint32_t executorCount;
QueueSizeT queueSize;
- size_t acceptedTasks;
- size_t rejectedTasks;
- ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {}
- ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected)
- : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected)
+ size_t acceptedTasks;
+ size_t rejectedTasks;
+ size_t wakeupCount; // Number of times a worker was woken up,
+ double absUtil;
+ ExecutorStats() : ExecutorStats(1, QueueSizeT(), 0, 0, 0, 1.0) {}
+ ExecutorStats(uint32_t executorCount_in, QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in, double absUtil_in)
+ : executorCount(executorCount_in),
+ queueSize(queueSize_in),
+ acceptedTasks(accepted),
+ rejectedTasks(rejected),
+ wakeupCount(wakeupCount_in),
+ absUtil(absUtil_in)
{}
- ExecutorStats & operator += (const ExecutorStats & rhs) {
+ void aggregate(const ExecutorStats & rhs) {
+ executorCount += rhs.executorCount;
queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(),
queueSize.total() + rhs.queueSize.total(),
queueSize.min() + rhs.queueSize.min(),
queueSize.max() + rhs.queueSize.max());
acceptedTasks += rhs.acceptedTasks;
rejectedTasks += rhs.rejectedTasks;
- return *this;
+ wakeupCount += rhs.wakeupCount;
+ absUtil += rhs.absUtil;
}
+ double getUtil() const { return absUtil / executorCount; }
};
}
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index f80a5b4ce32..e5da270e099 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) {
}
+ThreadStackExecutorBase::Worker::Worker()
+ : lock(),
+ cond(),
+ idleTracker(),
+ pre_guard(0xaaaaaaaa),
+ idle(true),
+ post_guard(0x55555555),
+ task()
+{}
+
+void
+ThreadStackExecutorBase::Worker::verify(bool expect_idle) const {
+ (void) expect_idle;
+ assert(pre_guard == 0xaaaaaaaa);
+ assert(post_guard == 0x55555555);
+ assert(idle == expect_idle);
+ assert(!task.task == expect_idle);
+}
+
void
ThreadStackExecutorBase::BlockedThread::wait() const
{
@@ -103,11 +122,17 @@ ThreadStackExecutorBase::obtainTask(Worker &worker)
return false;
}
_workers.push(&worker);
+ //TODO Not entirely correct as this counts working days, not wakeUps.
+ //But it should be the same, and here it is thread safe.
+ _stats.wakeupCount++;
}
{
unique_lock guard(worker.lock);
while (worker.idle) {
+ worker.idleTracker.set_idle(steady_clock::now());
worker.cond.wait(guard);
+ //TODO: _idleTracker.was_idle is not thread safe !!!! Must find other solution. Atomic ?
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
}
}
worker.idle = !worker.task.task;
@@ -141,6 +166,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize,
_lock(),
_cond(),
_stats(),
+ _idleTracker(steady_clock::now()),
_executorCompletion(),
_tasks(),
_workers(),
@@ -157,6 +183,8 @@ void
ThreadStackExecutorBase::start(uint32_t threads)
{
assert(threads > 0);
+ _stats.executorCount = threads;
+ _idleTracker.reset(steady_clock::now(), threads);
for (uint32_t i = 0; i < threads; ++i) {
FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get());
assert(thread != nullptr);
@@ -164,7 +192,8 @@ ThreadStackExecutorBase::start(uint32_t threads)
}
}
-size_t ThreadStackExecutorBase::getNumThreads() const {
+size_t
+ThreadStackExecutorBase::getNumThreads() const {
return _pool->GetNumStartedThreads();
}
@@ -208,6 +237,12 @@ ThreadStackExecutorBase::getStats()
{
std::unique_lock guard(_lock);
ExecutorStats stats = _stats;
+ stats.executorCount = getNumThreads();
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _workers.size(); i++) {
+ _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now));
+ }
+ stats.absUtil = _idleTracker.reset(now, 1);
_stats = ExecutorStats();
_stats.queueSize.add(_taskCount);
return stats;
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index 66a34bfde95..c3552cfe579 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -7,6 +7,7 @@
#include "arrayqueue.hpp"
#include "gate.h"
#include "runnable.h"
+#include "executor_idle_tracking.h"
#include <vector>
#include <functional>
@@ -47,18 +48,13 @@ private:
struct Worker {
std::mutex lock;
std::condition_variable cond;
- uint32_t pre_guard;
- bool idle;
- uint32_t post_guard;
- TaggedTask task;
- Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {}
- void verify(bool expect_idle) const {
- (void) expect_idle;
- assert(pre_guard == 0xaaaaaaaa);
- assert(post_guard == 0x55555555);
- assert(idle == expect_idle);
- assert(!task.task == expect_idle);
- }
+ ThreadIdleTracker idleTracker;
+ uint32_t pre_guard;
+ bool idle;
+ uint32_t post_guard;
+ TaggedTask task;
+ Worker();
+ void verify(bool expect_idle) const;
};
struct BarrierCompletion {
@@ -81,6 +77,7 @@ private:
mutable std::mutex _lock;
std::condition_variable _cond;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Gate _executorCompletion;
ArrayQueue<TaggedTask> _tasks;
ArrayQueue<Worker*> _workers;