diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-22 11:14:35 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-10-22 11:14:35 +0000 |
commit | de33bea303b8c207a647551d6e7d575925a2b803 (patch) | |
tree | 098a89a599e8332745ae24939614f7168a92e519 | |
parent | 4060207695d6b8ebb22fee8274a83cfb78ec84fd (diff) |
properly set utilization
8 files changed, 42 insertions, 34 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index a0f8c6382ea..099a958a00b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -334,12 +334,11 @@ AdaptiveSequencedExecutor::getStats() { auto guard = std::lock_guard(_mutex); ExecutorStats stats = _stats; - stats.threadCount = _cfg.num_threads; steady_time now = steady_clock::now(); for (size_t i(0); i < _worker_stack.size(); i++) { _idleTracker.was_idle(_worker_stack.access(i)->idleTracker.reset(now)); } - _stats.absUtil = _idleTracker.reset(now, 1); + stats.setUtil(_cfg.num_threads, _idleTracker.reset(now, _cfg.num_threads)); _stats = ExecutorStats(); _stats.queueSize.add(_self.pending_tasks); return stats; diff --git a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h index 15da8b90e3c..99a4ddffa22 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foreground_thread_executor.h @@ -23,7 +23,7 @@ public: } size_t getNumThreads() const override { return 0; } ExecutorStats getStats() override { - return ExecutorStats(1, ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); + return ExecutorStats(ExecutorStats::QueueSizeT(), _accepted.load(std::memory_order_relaxed), 0, 1); } void setTaskLimit(uint32_t taskLimit) override { (void) taskLimit; } uint32_t getTaskLimit() const override { return std::numeric_limits<uint32_t>::max(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index b65f79e65cb..e756b608a3d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -19,7 +19,7 @@ ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) ForegroundTaskExecutor::~ForegroundTaskExecutor() = default; void -ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +ForegroundTaskExecutor::executeTask(ExecutorId id, Executor::Task::UP task) { assert(id.getId() < getNumExecutors()); task->run(); @@ -35,8 +35,8 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } -vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(1, vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1, 1.0); +ExecutorStats ForegroundTaskExecutor::getStats() { + return ExecutorStats(ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0, 1); } ISequencedTaskExecutor::ExecutorId @@ -44,4 +44,4 @@ ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const { return ExecutorId(componentId%getNumExecutors()); } -} // namespace search +} diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h index 7359de0ba66..9d351aca653 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h @@ -22,10 +22,10 @@ public: ~ForegroundTaskExecutor() override; ExecutorId getExecutorId(uint64_t componentId) const override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void executeTask(ExecutorId id, Executor::Task::UP task) override; void sync() override; void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; private: std::atomic<uint64_t> _accepted; }; diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 2bb87350036..af95918ccab 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -170,7 +170,8 @@ SingleExecutor::getStats() { uint64_t accepted = _wp.load(std::memory_order_relaxed); steady_time now = steady_clock::now(); _idleTracker.was_idle(_threadIdleTracker.reset(now)); - ExecutorStats stats(1, _queueSize, (accepted - _lastAccepted), 0, _wakeupCount, _idleTracker.reset(now, 1)); + ExecutorStats stats(_queueSize, (accepted - _lastAccepted), 0, _wakeupCount); + stats.setUtil(1, _idleTracker.reset(now, 1)); _wakeupCount = 0; _lastAccepted = accepted; _queueSize = ExecutorStats::QueueSizeT() ; diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index 00e23db244b..7fec2b76c06 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -4,7 +4,7 @@ #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/backtrace.h> #include <vespa/vespalib/util/size_literals.h> -#include <atomic> +#include <thread> using namespace vespalib; @@ -190,15 +190,15 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( TEST("require that stats can be accumulated") { EXPECT_TRUE(std::atomic<duration>::is_always_lock_free); - ExecutorStats stats(3, ExecutorStats::QueueSizeT(1) ,2,3,7, 0.6); + ExecutorStats stats(ExecutorStats::QueueSizeT(1) ,2,3,7); + stats.setUtil(3, 0.8); EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); EXPECT_EQUAL(7u, stats.wakeupCount); - EXPECT_EQUAL(3u, stats.threadCount); - EXPECT_EQUAL(0.6, stats.absUtil); + EXPECT_EQUAL(3u, stats.getThreadCount()); EXPECT_EQUAL(0.2, stats.getUtil()); - stats.aggregate(ExecutorStats(7, ExecutorStats::QueueSizeT(7),8,9,11, 1.9)); + stats.aggregate(ExecutorStats(ExecutorStats::QueueSizeT(7),8,9,11).setUtil(7,0.5)); EXPECT_EQUAL(2u, stats.queueSize.count()); EXPECT_EQUAL(8u, stats.queueSize.total()); EXPECT_EQUAL(8u, stats.queueSize.max()); @@ -206,18 +206,18 @@ TEST("require that stats can be accumulated") { EXPECT_EQUAL(8u, stats.queueSize.max()); EXPECT_EQUAL(4.0, stats.queueSize.average()); - EXPECT_EQUAL(10u, stats.threadCount); + EXPECT_EQUAL(10u, stats.getThreadCount()); EXPECT_EQUAL(10u, stats.acceptedTasks); EXPECT_EQUAL(12u, stats.rejectedTasks); EXPECT_EQUAL(18u, stats.wakeupCount); - EXPECT_EQUAL(2.5, stats.absUtil); - EXPECT_EQUAL(0.25, stats.getUtil()); + EXPECT_EQUAL(0.41, stats.getUtil()); } -TEST("Test that load is computed") { +TEST("Test that utilization is computed") { ThreadStackExecutor executor(1, 128_Ki); + std::this_thread::sleep_for(1s); auto stats = executor.getStats(); - EXPECT_EQUAL(0.0, stats.absUtil); + EXPECT_GREATER(0.01, stats.getUtil()); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index 4b4354dc977..577ae933ec2 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -53,25 +53,28 @@ private: * Struct representing stats for an executor. * Note that aggregation requires sample interval to be the same(similar) for all samples. **/ -struct ExecutorStats { +class ExecutorStats { +private: + size_t _threadCount; + double _absUtil; +public: using QueueSizeT = AggregatedAverage<size_t>; - uint32_t threadCount; QueueSizeT queueSize; size_t acceptedTasks; size_t rejectedTasks; size_t wakeupCount; // Number of times a worker was woken up, - double absUtil; - ExecutorStats() : ExecutorStats(1, QueueSizeT(), 0, 0, 0, 1.0) {} - ExecutorStats(uint32_t executorCount_in, QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in, double absUtil_in) - : threadCount(executorCount_in), + + ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0, 0) {} + ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected, size_t wakeupCount_in) + : _threadCount(1), + _absUtil(1.0), queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected), - wakeupCount(wakeupCount_in), - absUtil(absUtil_in) + wakeupCount(wakeupCount_in) {} void aggregate(const ExecutorStats & rhs) { - threadCount += rhs.threadCount; + _threadCount += rhs._threadCount; queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(), queueSize.total() + rhs.queueSize.total(), queueSize.min() + rhs.queueSize.min(), @@ -79,9 +82,15 @@ struct ExecutorStats { acceptedTasks += rhs.acceptedTasks; rejectedTasks += rhs.rejectedTasks; wakeupCount += rhs.wakeupCount; - absUtil += rhs.absUtil; + _absUtil += rhs._absUtil; + } + ExecutorStats & setUtil(uint32_t threadCount, double idle) { + _threadCount = threadCount; + _absUtil = (1.0 - idle) * threadCount; + return *this; } - double getUtil() const { return absUtil / threadCount; } + double getUtil() const { return _absUtil / _threadCount; } + size_t getThreadCount() const { return _threadCount; } }; } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 53777ee6e6b..9e6456d2f56 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -178,7 +178,6 @@ void ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); - _stats.threadCount = threads; _idleTracker.reset(steady_clock::now(), threads); for (uint32_t i = 0; i < threads; ++i) { FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); @@ -232,12 +231,12 @@ ThreadStackExecutorBase::getStats() { std::unique_lock guard(_lock); ExecutorStats stats = _stats; - stats.threadCount = getNumThreads(); steady_time now = steady_clock::now(); for (size_t i(0); i < _workers.size(); i++) { _idleTracker.was_idle(_workers.access(i)->idleTracker.reset(now)); } - stats.absUtil = _idleTracker.reset(now, 1); + size_t numThreads = getNumThreads(); + stats.setUtil(numThreads, _idleTracker.reset(now, numThreads)); _stats = ExecutorStats(); _stats.queueSize.add(_taskCount); return stats; |