diff options
Diffstat (limited to 'vespalib/src')
4 files changed, 83 insertions, 24 deletions
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index b55f54f9339..56dbf0d19b5 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -74,9 +74,9 @@ struct MyState { ExecutorStats stats = executor.getStats(); EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); - EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, - stats.acceptedTasks); + EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks); EXPECT_EQUAL(expect_rejected, stats.rejectedTasks); + EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks)); EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0)); if (expect_deleted == 0) { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); @@ -85,6 +85,7 @@ struct MyState { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); EXPECT_EQUAL(0u, stats.acceptedTasks); EXPECT_EQUAL(0u, stats.rejectedTasks); + EXPECT_EQUAL(0u, stats.wakeupCount); return *this; } }; @@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3); + EXPECT_TRUE(std::atomic<duration>::is_always_lock_free); + ExecutorStats stats(3, ExecutorStats::QueueSizeT(1) ,2,3,7, 0.6); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9); + EXPECT_EQUAL(7u, stats.wakeupCount); + EXPECT_EQUAL(3u, stats.executorCount); + EXPECT_EQUAL(0.6, stats.absUtil); + EXPECT_EQUAL(0.2, stats.getUtil()); + stats.aggregate(ExecutorStats(7, ExecutorStats::QueueSizeT(7),8,9,11, 1.9)); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); @@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") { EXPECT_EQUAL(8u, stats.queueSize.max()); EXPECT_EQUAL(4.0, stats.queueSize.average()); + EXPECT_EQUAL(10u, stats.executorCount); EXPECT_EQUAL(10u, stats.acceptedTasks); EXPECT_EQUAL(12u, stats.rejectedTasks); + EXPECT_EQUAL(18u, stats.wakeupCount); + EXPECT_EQUAL(2.5, stats.absUtil); + EXPECT_EQUAL(0.25, stats.getUtil()); +} +TEST("Test that load is computed") { + ThreadStackExecutor executor(1, 128_Ki); + auto stats = executor.getStats(); + EXPECT_EQUAL(0.0, stats.absUtil); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index f1f58685570..3ebf1d8744d 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -51,25 +51,37 @@ private: /** * Struct representing stats for an executor. + * Note that aggregation requires sample interval to be the same(similar) for all samples. **/ struct ExecutorStats { using QueueSizeT = AggregatedAverage<size_t>; + uint32_t executorCount; QueueSizeT queueSize; - size_t acceptedTasks; - size_t rejectedTasks; - ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {} - ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected) - : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected) + size_t acceptedTasks; + size_t rejectedTasks; + size_t wakeupCount; // Number of times a worker was woken up, + double absUtil; + ExecutorStats() : ExecutorStats(1, QueueSizeT(), 0, 0, 0, 1.0) {} + ExecutorStats(uint32_t executorCount_in, QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in, double absUtil_in) + : executorCount(executorCount_in), + queueSize(queueSize_in), + acceptedTasks(accepted), + rejectedTasks(rejected), + wakeupCount(wakeupCount_in), + absUtil(absUtil_in) {} - ExecutorStats & operator += (const ExecutorStats & rhs) { + void aggregate(const ExecutorStats & rhs) { + executorCount += rhs.executorCount; queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(), queueSize.total() + rhs.queueSize.total(), queueSize.min() + rhs.queueSize.min(), queueSize.max() + rhs.queueSize.max()); acceptedTasks += rhs.acceptedTasks; rejectedTasks += rhs.rejectedTasks; - return *this; + wakeupCount += rhs.wakeupCount; + absUtil += rhs.absUtil; } + double getUtil() const { return absUtil / executorCount; } }; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index f80a5b4ce32..e5da270e099 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) { } +ThreadStackExecutorBase::Worker::Worker() + : lock(), + cond(), + idleTracker(), + pre_guard(0xaaaaaaaa), + idle(true), + post_guard(0x55555555), + task() +{} + +void +ThreadStackExecutorBase::Worker::verify(bool expect_idle) const { + (void) expect_idle; + assert(pre_guard == 0xaaaaaaaa); + assert(post_guard == 0x55555555); + assert(idle == expect_idle); + assert(!task.task == expect_idle); +} + void ThreadStackExecutorBase::BlockedThread::wait() const { @@ -103,11 +122,17 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) return false; } _workers.push(&worker); + //TODO Not entirely correct as this counts working days, not wakeUps. + //But it should be the same, and here it is thread safe. + _stats.wakeupCount++; } { unique_lock guard(worker.lock); while (worker.idle) { + worker.idleTracker.set_idle(steady_clock::now()); worker.cond.wait(guard); + //TODO: _idleTracker.was_idle is not thread safe !!!! Must find other solution. Atomic ? + _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now())); } } worker.idle = !worker.task.task; @@ -141,6 +166,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _lock(), _cond(), _stats(), + _idleTracker(steady_clock::now()), _executorCompletion(), _tasks(), _workers(), @@ -157,6 +183,8 @@ void ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); + _stats.executorCount = threads; + _idleTracker.reset(steady_clock::now(), threads); for (uint32_t i = 0; i < threads; ++i) { FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); assert(thread != nullptr); @@ -164,7 +192,8 @@ ThreadStackExecutorBase::start(uint32_t threads) } } -size_t ThreadStackExecutorBase::getNumThreads() const { +size_t +ThreadStackExecutorBase::getNumThreads() const { return _pool->GetNumStartedThreads(); } @@ -208,6 +237,12 @@ ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); ExecutorStats stats = _stats; + stats.executorCount = getNumThreads(); + steady_time now = steady_clock::now(); + for (size_t i(0); i < _workers.size(); i++) { + _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now)); + } + stats.absUtil = _idleTracker.reset(now, 1); _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 66a34bfde95..c3552cfe579 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -7,6 +7,7 @@ #include "arrayqueue.hpp" #include "gate.h" #include "runnable.h" +#include "executor_idle_tracking.h" #include <vector> #include <functional> @@ -47,18 +48,13 @@ private: struct Worker { std::mutex lock; std::condition_variable cond; - uint32_t pre_guard; - bool idle; - uint32_t post_guard; - TaggedTask task; - Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {} - void verify(bool expect_idle) const { - (void) expect_idle; - assert(pre_guard == 0xaaaaaaaa); - assert(post_guard == 0x55555555); - assert(idle == expect_idle); - assert(!task.task == expect_idle); - } + ThreadIdleTracker idleTracker; + uint32_t pre_guard; + bool idle; + uint32_t post_guard; + TaggedTask task; + Worker(); + void verify(bool expect_idle) const; }; struct BarrierCompletion { @@ -81,6 +77,7 @@ private: mutable std::mutex _lock; std::condition_variable _cond; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; |