diff options
Diffstat (limited to 'staging_vespalib')
7 files changed, 38 insertions, 11 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 869ff0456e1..7681af43afc 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -21,6 +21,7 @@ AdaptiveSequencedExecutor::Strand::~Strand() AdaptiveSequencedExecutor::Worker::Worker() : cond(), + idleTracker(), state(State::RUNNING), strand(nullptr) { @@ -152,8 +153,11 @@ AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::m worker.state = Worker::State::BLOCKED; _worker_stack.push(&worker); while (worker.state == Worker::State::BLOCKED) { + worker.idleTracker.set_idle(steady_clock::now()); worker.cond.wait(lock); + _idleTracker.was_idle(worker.idleTracker.set_active(steady_clock::now())); } + _stats.wakeupCount++; } return (worker.state == Worker::State::RUNNING); } @@ -233,6 +237,7 @@ AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t _worker_stack(num_threads), _self(), _stats(), + _idleTracker(steady_clock::now()), _cfg(num_threads, max_waiting, max_pending) { _stats.queueSize.add(_self.pending_tasks); @@ -329,6 +334,12 @@ AdaptiveSequencedExecutor::getStats() { auto guard = std::lock_guard(_mutex); ExecutorStats stats = _stats; + stats.executorCount = _cfg.num_threads; + steady_time now = steady_clock::now(); + for (size_t i(0); i < _worker_stack.size(); i++) { + _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now)); + } + _stats.absUtil = _idleTracker.reset(now, 1); _stats = ExecutorStats(); _stats.queueSize.add(_self.pending_tasks); return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index fdcdf35fbbb..776248384a5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -3,6 +3,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <vespa/vespalib/util/arrayqueue.hpp> #include <vespa/vespalib/util/gate.h> #include <vespa/vespalib/util/eventbarrier.hpp> @@ -70,7 +71,7 @@ private: struct Strand { enum class State { IDLE, WAITING, ACTIVE }; State state; - vespalib::ArrayQueue<TaggedTask> queue; + ArrayQueue<TaggedTask> queue; Strand(); ~Strand(); }; @@ -81,6 +82,7 @@ private: struct Worker { enum class State { RUNNING, BLOCKED, DONE }; std::condition_variable cond; + ThreadIdleTracker idleTracker; State state; Strand *strand; Worker(); @@ -107,7 +109,7 @@ private: static constexpr size_t STACK_SIZE = (256 * 1024); AdaptiveSequencedExecutor &parent; std::unique_ptr<FastOS_ThreadPool> pool; - vespalib::Gate allow_worker_exit; + Gate allow_worker_exit; ThreadTools(AdaptiveSequencedExecutor &parent_in); ~ThreadTools(); void Run(FastOS_ThreadInterface *, void *) override; @@ -123,11 +125,12 @@ private: std::unique_ptr<ThreadTools> _thread_tools; mutable std::mutex _mutex; std::vector<Strand> _strands; - vespalib::ArrayQueue<Strand*> _wait_queue; - vespalib::ArrayQueue<Worker*> _worker_stack; + ArrayQueue<Strand*> _wait_queue; + ArrayQueue<Worker*> _worker_stack; EventBarrier<BarrierCompletion> _barrier; Self _self; ExecutorStats _stats; + ExecutorIdleTracker _idleTracker; Config _cfg; void maybe_block_self(std::unique_lock<std::mutex> &lock); diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 3bd5ca3d49a..15da8b90e3c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -23,7 +23,7 @@ public: } size_t getNumThreads() const override { return 0; } ExecutorStats getStats() override { - return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0); + return ExecutorStats(1, ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index 0d132193af1..b65f79e65cb 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -36,7 +36,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); + return vespalib::ExecutorStats(1, vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); } ISequencedTaskExecutor::ExecutorId diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 727894397a7..9e95bdaa3ab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -104,7 +104,7 @@ SequencedTaskExecutor::getStats() { ExecutorStats accumulatedStats; for (auto &executor :* _executors) { - accumulatedStats += executor->getStats(); + accumulatedStats.aggregate(executor->getStats()); } return accumulatedStats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 803ec4f3f7c..2bb87350036 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,6 +19,9 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t wat _consumerCondition(), _producerCondition(), _thread(*this), + _idleTracker(steady_clock::now()), + _threadIdleTracker(), + _wakeupCount(0), _lastAccepted(0), _queueSize(), _wakeupConsumerAt(0), @@ -115,7 +118,11 @@ SingleExecutor::run() { _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, _reactionTime); + steady_time now = steady_clock::now(); + _threadIdleTracker.set_idle(now); + _consumerCondition.wait_until(lock, now + _reactionTime); + _idleTracker.was_idle(_threadIdleTracker.set_active(steady_clock::now())); + _wakeupCount++; } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -150,7 +157,6 @@ SingleExecutor::wait_for_room(Lock & lock) { drain(lock); _tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit); _taskLimit = _wantedTaskLimit.load(); - taskLimit = _taskLimit; } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { @@ -162,7 +168,10 @@ ExecutorStats SingleExecutor::getStats() { Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0); + steady_time now = steady_clock::now(); + _idleTracker.was_idle(_threadIdleTracker.reset(now)); + ExecutorStats stats(1, _queueSize, (accepted - _lastAccepted), 0, _wakeupCount, _idleTracker.reset(now, 1)); + _wakeupCount = 0; _lastAccepted = accepted; _queueSize = ExecutorStats::QueueSizeT() ; return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 8e9c1ae3fa1..7d868322558 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/threadexecutor.h> #include <vespa/vespalib/util/thread.h> #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> @@ -18,7 +19,7 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - explicit SingleExecutor(init_fun_t func, uint32_t taskLimit); + SingleExecutor(init_fun_t func, uint32_t taskLimit); SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; @@ -54,6 +55,9 @@ private: std::condition_variable _consumerCondition; std::condition_variable _producerCondition; vespalib::Thread _thread; + ExecutorIdleTracker _idleTracker; + ThreadIdleTracker _threadIdleTracker; + uint64_t _wakeupCount; uint64_t _lastAccepted; ExecutorStats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; |