aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-19 05:06:06 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-22 07:31:50 +0000
commit86e9a67362a9e8de7a4a267f6450b41cc60be290 (patch)
tree6f0e34f9ed012bdf6fa2eeecdb97fd89ee54e092 /staging_vespalib
parent7ec9aa11869ffdba64a0009a7396a4483a045cf5 (diff)
Add a metric for how many times a worker in a thread pool has woken up.
Also track the idle time a worker has and add metric for the utilization.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp15
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h6
7 files changed, 38 insertions, 11 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
index 869ff0456e1..7681af43afc 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp
@@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand()
AdaptiveSequencedExecutor::Worker::Worker()
: cond(),
+ idleTracker(),
state(State::RUNNING),
strand(nullptr)
{
@@ -152,8 +153,11 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m
worker.state = Worker::State::BLOCKED;
_worker_stack.push(&worker);
while (worker.state == Worker::State::BLOCKED) {
+ worker.idleTracker.set_idle(steady_clock::now());
worker.cond.wait(lock);
+ _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now()));
}
+ _stats.wakeupCount++;
}
return (worker.state == Worker::State::RUNNING);
}
@@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t
_worker_stack(num_threads),
_self(),
_stats(),
+ _idleTracker(steady_clock::now()),
_cfg(num_threads, max_waiting, max_pending)
{
_stats.queueSize.add(_self.pending_tasks);
@@ -329,6 +334,12 @@ AdaptiveSequencedExecutor::getStats()
{
auto guard = std::lock_guard(_mutex);
ExecutorStats stats = _stats;
+ stats.executorCount = _cfg.num_threads;
+ steady_time now = steady_clock::now();
+ for (size_t i(0); i < _worker_stack.size(); i++) {
+ _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now));
+ }
+ _stats.absUtil = _idleTracker.reset(now, 1);
_stats = ExecutorStats();
_stats.queueSize.add(_self.pending_tasks);
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
index fdcdf35fbbb..776248384a5 100644
--- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h
@@ -3,6 +3,7 @@
#pragma once
#include "isequencedtaskexecutor.h"
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/gate.h>
#include <vespa/vespalib/util/eventbarrier.hpp>
@@ -70,7 +71,7 @@ private:
struct Strand {
enum class State { IDLE, WAITING, ACTIVE };
State state;
- vespalib::ArrayQueue<TaggedTask> queue;
+ ArrayQueue<TaggedTask> queue;
Strand();
~Strand();
};
@@ -81,6 +82,7 @@ private:
struct Worker {
enum class State { RUNNING, BLOCKED, DONE };
std::condition_variable cond;
+ ThreadIdleTracker idleTracker;
State state;
Strand *strand;
Worker();
@@ -107,7 +109,7 @@ private:
static constexpr size_t STACK_SIZE = (256 * 1024);
AdaptiveSequencedExecutor &parent;
std::unique_ptr<FastOS_ThreadPool> pool;
- vespalib::Gate allow_worker_exit;
+ Gate allow_worker_exit;
ThreadTools(AdaptiveSequencedExecutor &parent_in);
~ThreadTools();
void Run(FastOS_ThreadInterface *, void *) override;
@@ -123,11 +125,12 @@ private:
std::unique_ptr<ThreadTools> _thread_tools;
mutable std::mutex _mutex;
std::vector<Strand> _strands;
- vespalib::ArrayQueue<Strand*> _wait_queue;
- vespalib::ArrayQueue<Worker*> _worker_stack;
+ ArrayQueue<Strand*> _wait_queue;
+ ArrayQueue<Worker*> _worker_stack;
EventBarrier<BarrierCompletion> _barrier;
Self _self;
ExecutorStats _stats;
+ ExecutorIdleTracker _idleTracker;
Config _cfg;
void maybe_block_self(std::unique_lock<std::mutex> &lock);
diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
index 3bd5ca3d49a..15da8b90e3c 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h
@@ -23,7 +23,7 @@ public:
}
size_t getNumThreads() const override { return 0; }
ExecutorStats getStats() override {
- return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0);
+ return ExecutorStats(1, ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1, 1.0);
}
void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; }
uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); }
diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
index 0d132193af1..b65f79e65cb 100644
--- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp
@@ -36,7 +36,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
}
vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
+ return vespalib::ExecutorStats(1, vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1, 1.0);
}
ISequencedTaskExecutor::ExecutorId
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index 727894397a7..9e95bdaa3ab 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats()
{
ExecutorStats accumulatedStats;
for (auto &executor :* _executors) {
- accumulatedStats += executor->getStats();
+ accumulatedStats.aggregate(executor->getStats());
}
return accumulatedStats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 803ec4f3f7c..2bb87350036 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat
_consumerCondition(),
_producerCondition(),
_thread(*this),
+ _idleTracker(steady_clock::now()),
+ _threadIdleTracker(),
+ _wakeupCount(0),
_lastAccepted(0),
_queueSize(),
_wakeupConsumerAt(0),
@@ -115,7 +118,11 @@ SingleExecutor::run() {
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, _reactionTime);
+ steady_time now = steady_clock::now();
+ _threadIdleTracker.set_idle(now);
+ _consumerCondition.wait_until(lock, now + _reactionTime);
+ _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now()));
+ _wakeupCount++;
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) {
drain(lock);
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
_taskLimit = _wantedTaskLimit.load();
- taskLimit = _taskLimit;
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
@@ -162,7 +168,10 @@ ExecutorStats
SingleExecutor::getStats() {
Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0);
+ steady_time now = steady_clock::now();
+ _idleTracker.was_idle(_threadIdleTracker.reset(now));
+ ExecutorStats stats(1, _queueSize, (accepted - _lastAccepted), 0, _wakeupCount, _idleTracker.reset(now, 1));
+ _wakeupCount = 0;
_lastAccepted = accepted;
_queueSize = ExecutorStats::QueueSizeT() ;
return stats;
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 8e9c1ae3fa1..7d868322558 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/threadexecutor.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
@@ -18,7 +19,7 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit);
SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
@@ -54,6 +55,9 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ ExecutorIdleTracker _idleTracker;
+ ThreadIdleTracker _threadIdleTracker;
+ uint64_t _wakeupCount;
uint64_t _lastAccepted;
ExecutorStats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;