diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-19 05:06:06 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-22 07:31:50 +0000 |
commit | 86e9a67362a9e8de7a4a267f6450b41cc60be290 (patch) | |
tree | 6f0e34f9ed012bdf6fa2eeecdb97fd89ee54e092 | |
parent | 7ec9aa11869ffdba64a0009a7396a4483a045cf5 (diff) |
Add a metric for how many times a worker in a thread pool has woken up.
Also track the idle time a worker has and add metric for the utilization.
13 files changed, 130 insertions, 38 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp index 74e0971178c..789e97138e0 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp @@ -10,6 +10,8 @@ ExecutorMetrics::update(const vespalib::ExecutorStats &stats) maxPending.set(stats.queueSize.max()); accepted.inc(stats.acceptedTasks); rejected.inc(stats.rejectedTasks); + wakeupCount.inc(stats.wakeupCount); + util.set(stats.getUtil()); const auto & qSize = stats.queueSize; queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max()); } @@ -19,6 +21,8 @@ ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *pa maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this), accepted("accepted", {}, "Number of accepted tasks", this), rejected("rejected", {}, "Number of rejected tasks", this), + wakeupCount("wakeupCount", {}, "Number of times a worker thread has been woken up", this), + util("utilization", {}, "ratio of time the worker threads has been active", this), queueSize("queuesize", {}, "Size of task queue", this) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h index 273c4ed8979..31d959a399f 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h @@ -11,9 +11,11 @@ namespace proton { struct ExecutorMetrics : metrics::MetricSet { - metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible. - metrics::LongCountMetric accepted; - metrics::LongCountMetric rejected; + metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible. + metrics::LongCountMetric accepted; + metrics::LongCountMetric rejected; + metrics::LongCountMetric wakeupCount; + metrics::DoubleValueMetric util; metrics::LongAverageMetric queueSize; void update(const vespalib::ExecutorStats &stats); diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 869ff0456e1..7681af43afc 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand() AdaptiveSequencedExecutor::Worker::Worker() : cond(), + idleTracker(), state(State::RUNNING), strand(nullptr) { @@ -152,8 +153,11 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m worker.state = Worker::State::BLOCKED; _worker_stack.push(&worker); while (worker.state == Worker::State::BLOCKED) { + worker.idleTracker.set_idle(steady_clock::now()); worker.cond.wait(lock); + _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now())); } + _stats.wakeupCount++; } return (worker.state == Worker::State::RUNNING); } @@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t _worker_stack(num_threads), _self(), _stats(), + _idleTracker(steady_clock::now()), _cfg(num_threads, max_waiting, max_pending) { _stats.queueSize.add(_self.pending_tasks); @@ -329,6 +334,12 @@ AdaptiveSequencedExecutor::getStats() { auto guard = std::lock_guard(_mutex); ExecutorStats stats = _stats; + stats.executorCount = _cfg.num_threads; + steady_time now = steady_clock::now(); + for (size_t i(0); i < _worker_stack.size(); i++) { + _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now)); + } + _stats.absUtil = _idleTracker.reset(now, 1); _stats = ExecutorStats(); _stats.queueSize.add(_self.pending_tasks); return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index fdcdf35fbbb..776248384a5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -3,6 +3,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/eventbarrier.hpp> @@ -70,7 +71,7 @@ private: struct Strand { enum class State { IDLE, WAITING, ACTIVE }; State state; - vespalib::ArrayQueue<TaggedTask> queue; + ArrayQueue<TaggedTask> queue; Strand(); ~Strand(); }; @@ -81,6 +82,7 @@ private: struct Worker { enum class State { RUNNING, BLOCKED, DONE }; std::condition_variable cond; + ThreadIdleTracker idleTracker; State state; Strand *strand; Worker(); @@ -107,7 +109,7 @@ private: static constexpr size_t STACK_SIZE = (256 * 1024); AdaptiveSequencedExecutor &parent; std::unique_ptr<FastOS_ThreadPool> pool; - vespalib::Gate allow_worker_exit; + Gate allow_worker_exit; ThreadTools(AdaptiveSequencedExecutor &parent_in); ~ThreadTools(); void Run(FastOS_ThreadInterface *, void *) override; @@ -123,11 +125,12 @@ private: std::unique_ptr<ThreadTools> _thread_tools; mutable std::mutex _mutex; std::vector<Strand> _strands; - vespalib::ArrayQueue<Strand*> _wait_queue; - vespalib::ArrayQueue<Worker*> _worker_stack; + ArrayQueue<Strand*> _wait_queue; + ArrayQueue<Worker*> _worker_stack; EventBarrier<BarrierCompletion> _barrier; Self _self; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Config _cfg; void maybe_block_self(std::unique_lock<std::mutex> &lock); diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 3bd5ca3d49a..15da8b90e3c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -23,7 +23,7 @@ public: } size_t getNumThreads() const override { return 0; } ExecutorStats getStats() override { - return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); + return ExecutorStats(1, ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index 0d132193af1..b65f79e65cb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -36,7 +36,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); + return vespalib::ExecutorStats(1, vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); } ISequencedTaskExecutor::ExecutorId diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 727894397a7..9e95bdaa3ab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats() { ExecutorStats accumulatedStats; for (auto &executor :* _executors) { - accumulatedStats += executor->getStats(); + accumulatedStats.aggregate(executor->getStats()); } return accumulatedStats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 803ec4f3f7c..2bb87350036 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _consumerCondition(), _producerCondition(), _thread(*this), + _idleTracker(steady_clock::now()), + _threadIdleTracker(), + _wakeupCount(0), _lastAccepted(0), _queueSize(), _wakeupConsumerAt(0), @@ -115,7 +118,11 @@ SingleExecutor::run() { _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, _reactionTime); + steady_time now = steady_clock::now(); + _threadIdleTracker.set_idle(now); + _consumerCondition.wait_until(lock, now + _reactionTime); + _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now())); + _wakeupCount++; } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) { drain(lock); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); - taskLimit = _taskLimit; } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { @@ -162,7 +168,10 @@ ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0); + steady_time now = steady_clock::now(); + _idleTracker.was_idle(_threadIdleTracker.reset(now)); + ExecutorStats stats(1, _queueSize, (accepted - _lastAccepted), 0, _wakeupCount, _idleTracker.reset(now, 1)); + _wakeupCount = 0; _lastAccepted = accepted; _queueSize = ExecutorStats::QueueSizeT() ; return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 8e9c1ae3fa1..7d868322558 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -18,7 +19,7 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - explicit SingleExecutor(init_fun_t func, uint32_t taskLimit); + SingleExecutor(init_fun_t func, uint32_t taskLimit); SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; @@ -54,6 +55,9 @@ private: std::condition_variable _consumerCondition; std::condition_variable _producerCondition; vespalib::Thread _thread; + ExecutorIdleTracker _idleTracker; + ThreadIdleTracker _threadIdleTracker; + uint64_t _wakeupCount; uint64_t _lastAccepted; ExecutorStats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index b55f54f9339..56dbf0d19b5 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -74,9 +74,9 @@ struct MyState { ExecutorStats stats = executor.getStats(); EXPECT_EQUAL(expect_running + expect_deleted, MyTask::runCnt); EXPECT_EQUAL(expect_rejected + expect_deleted, MyTask::deleteCnt); - EXPECT_EQUAL(expect_queue + expect_running + expect_deleted, - stats.acceptedTasks); + EXPECT_EQUAL(expect_queue + expect_running + expect_deleted,stats.acceptedTasks); EXPECT_EQUAL(expect_rejected, stats.rejectedTasks); + EXPECT_TRUE(stats.wakeupCount <= (NUM_THREADS + stats.acceptedTasks)); EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0)); if (expect_deleted == 0) { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); @@ -85,6 +85,7 @@ struct MyState { EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); EXPECT_EQUAL(0u, stats.acceptedTasks); EXPECT_EQUAL(0u, stats.rejectedTasks); + EXPECT_EQUAL(0u, stats.wakeupCount); return *this; } }; @@ -188,11 +189,16 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3); + EXPECT_TRUE(std::atomic<duration>::is_always_lock_free); + ExecutorStats stats(3, ExecutorStats::QueueSizeT(1) ,2,3,7, 0.6); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ExecutorStats(ExecutorStats::QueueSizeT(7),8,9); + EXPECT_EQUAL(7u, stats.wakeupCount); + EXPECT_EQUAL(3u, stats.executorCount); + EXPECT_EQUAL(0.6, stats.absUtil); + EXPECT_EQUAL(0.2, stats.getUtil()); + stats.aggregate(ExecutorStats(7, ExecutorStats::QueueSizeT(7),8,9,11, 1.9)); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); @@ -200,9 +206,18 @@ TEST("require that stats can be accumulated") { EXPECT_EQUAL(8u, stats.queueSize.max()); EXPECT_EQUAL(4.0, stats.queueSize.average()); + EXPECT_EQUAL(10u, stats.executorCount); EXPECT_EQUAL(10u, stats.acceptedTasks); EXPECT_EQUAL(12u, stats.rejectedTasks); + EXPECT_EQUAL(18u, stats.wakeupCount); + EXPECT_EQUAL(2.5, stats.absUtil); + EXPECT_EQUAL(0.25, stats.getUtil()); +} +TEST("Test that load is computed") { + ThreadStackExecutor executor(1, 128_Ki); + auto stats = executor.getStats(); + EXPECT_EQUAL(0.0, stats.absUtil); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index f1f58685570..3ebf1d8744d 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -51,25 +51,37 @@ private: /** * Struct representing stats for an executor. + * Note that aggregation requires sample interval to be the same(similar) for all samples. **/ struct ExecutorStats { using QueueSizeT = AggregatedAverage<size_t>; + uint32_t executorCount; QueueSizeT queueSize; - size_t acceptedTasks; - size_t rejectedTasks; - ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {} - ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected) - : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected) + size_t acceptedTasks; + size_t rejectedTasks; + size_t wakeupCount; // Number of times a worker was woken up, + double absUtil; + ExecutorStats() : ExecutorStats(1, QueueSizeT(), 0, 0, 0, 1.0) {} + ExecutorStats(uint32_t executorCount_in, QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in, double absUtil_in) + : executorCount(executorCount_in), + queueSize(queueSize_in), + acceptedTasks(accepted), + rejectedTasks(rejected), + wakeupCount(wakeupCount_in), + absUtil(absUtil_in) {} - ExecutorStats & operator += (const ExecutorStats & rhs) { + void aggregate(const ExecutorStats & rhs) { + executorCount += rhs.executorCount; queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(), queueSize.total() + rhs.queueSize.total(), queueSize.min() + rhs.queueSize.min(), queueSize.max() + rhs.queueSize.max()); acceptedTasks += rhs.acceptedTasks; rejectedTasks += rhs.rejectedTasks; - return *this; + wakeupCount += rhs.wakeupCount; + absUtil += rhs.absUtil; } + double getUtil() const { return absUtil / executorCount; } }; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index f80a5b4ce32..e5da270e099 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -24,6 +24,25 @@ ThreadInit::Run(FastOS_ThreadInterface *, void *) { } +ThreadStackExecutorBase::Worker::Worker() + : lock(), + cond(), + idleTracker(), + pre_guard(0xaaaaaaaa), + idle(true), + post_guard(0x55555555), + task() +{} + +void +ThreadStackExecutorBase::Worker::verify(bool expect_idle) const { + (void) expect_idle; + assert(pre_guard == 0xaaaaaaaa); + assert(post_guard == 0x55555555); + assert(idle == expect_idle); + assert(!task.task == expect_idle); +} + void ThreadStackExecutorBase::BlockedThread::wait() const { @@ -103,11 +122,17 @@ ThreadStackExecutorBase::obtainTask(Worker &worker) return false; } _workers.push(&worker); + //TODO Not entirely correct as this counts working days, not wakeUps. + //But it should be the same, and here it is thread safe. + _stats.wakeupCount++; } { unique_lock guard(worker.lock); while (worker.idle) { + worker.idleTracker.set_idle(steady_clock::now()); worker.cond.wait(guard); + //TODO: _idleTracker.was_idle is not thread safe !!!! Must find other solution. Atomic ? + _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now())); } } worker.idle = !worker.task.task; @@ -141,6 +166,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _lock(), _cond(), _stats(), + _idleTracker(steady_clock::now()), _executorCompletion(), _tasks(), _workers(), @@ -157,6 +183,8 @@ void ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); + _stats.executorCount = threads; + _idleTracker.reset(steady_clock::now(), threads); for (uint32_t i = 0; i < threads; ++i) { FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); assert(thread != nullptr); @@ -164,7 +192,8 @@ ThreadStackExecutorBase::start(uint32_t threads) } } -size_t ThreadStackExecutorBase::getNumThreads() const { +size_t +ThreadStackExecutorBase::getNumThreads() const { return _pool->GetNumStartedThreads(); } @@ -208,6 +237,12 @@ ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); ExecutorStats stats = _stats; + stats.executorCount = getNumThreads(); + steady_time now = steady_clock::now(); + for (size_t i(0); i < _workers.size(); i++) { + _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now)); + } + stats.absUtil = _idleTracker.reset(now, 1); _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 66a34bfde95..c3552cfe579 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -7,6 +7,7 @@ #include "arrayqueue.hpp" #include "gate.h" #include "runnable.h" +#include "executor_idle_tracking.h" #include <vector> #include <functional> @@ -47,18 +48,13 @@ private: struct Worker { std::mutex lock; std::condition_variable cond; - uint32_t pre_guard; - bool idle; - uint32_t post_guard; - TaggedTask task; - Worker() : lock(), cond(), pre_guard(0xaaaaaaaa), idle(true), post_guard(0x55555555), task() {} - void verify(bool expect_idle) const { - (void) expect_idle; - assert(pre_guard == 0xaaaaaaaa); - assert(post_guard == 0x55555555); - assert(idle == expect_idle); - assert(!task.task == expect_idle); - } + ThreadIdleTracker idleTracker; + uint32_t pre_guard; + bool idle; + uint32_t post_guard; + TaggedTask task; + Worker(); + void verify(bool expect_idle) const; }; struct BarrierCompletion { @@ -81,6 +77,7 @@ private: mutable std::mutex _lock; std::condition_variable _cond; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Gate _executorCompletion; ArrayQueue<TaggedTask> _tasks; ArrayQueue<Worker*> _workers; |