diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 11:40:45 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 11:40:45 +0000 |
commit | 44ec2d74b5e828b7d1f0bc97a315846a3d5051ac (patch) | |
tree | 330b9dcf6b741a5609647d6685ffe1e421901dae | |
parent | 668412faf939d3d3d185c6e7c81056c4cdb5afe3 (diff) |
Unify the metrics for queuesize similar to what we have for the spi queues.
9 files changed, 107 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp index 710c072aa53..f4e9bff1b4b 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp @@ -7,16 +7,19 @@ namespace proton { void ExecutorMetrics::update(const vespalib::ThreadStackExecutorBase::Stats &stats) { - maxPending.set(stats.maxPendingTasks); + maxPending.set(stats.queueSize.max()); accepted.inc(stats.acceptedTasks); rejected.inc(stats.rejectedTasks); + const vespalib::ThreadStackExecutorBase::Stats::QueueSizeT & qSize = stats.queueSize; + queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max()); } ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *parent) : metrics::MetricSet(name, {}, "Instance specific thread executor metrics", parent), maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this), accepted("accepted", {}, "Number of accepted tasks", this), - rejected("rejected", {}, "Number of rejected tasks", this) + rejected("rejected", {}, "Number of rejected tasks", this), + queueSize("queuesize", {}, "size of task queue", this) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h index a347edffd4b..6b638391d1e 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h @@ -11,9 +11,10 @@ namespace proton { struct ExecutorMetrics : metrics::MetricSet { - metrics::LongValueMetric maxPending; + metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible. metrics::LongCountMetric accepted; metrics::LongCountMetric rejected; + metrics::LongAverageMetric queueSize; void update(const vespalib::ThreadStackExecutorBase::Stats &stats); ExecutorMetrics(const std::string &name, metrics::MetricSet *parent); diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 4c501defeea..a93eb1ff4bc 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -39,7 +39,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) { } vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0); + return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); } } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 73612452b09..4b92d9a9687 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -15,7 +15,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _producerCondition(), _thread(*this), _lastAccepted(0), - _maxPending(0), + _queueSize(), _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0) @@ -112,10 +112,6 @@ SingleExecutor::drain_tasks() { void SingleExecutor::run_tasks_till(uint64_t available) { uint64_t consumed = _rp.load(std::memory_order_relaxed); - uint64_t left = available - consumed; - if (_maxPending.load(std::memory_order_relaxed) < left) { - _maxPending.store(left, std::memory_order_relaxed); - } uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed); while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); @@ -137,6 +133,7 @@ SingleExecutor::wait_for_room(Lock & lock) { _taskLimit = _wantedTaskLimit.load(); taskLimit = _taskLimit; } + _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { sleepProducer(lock, 10ms, wp - taskLimit/4); } @@ -144,10 +141,11 @@ SingleExecutor::wait_for_room(Lock & lock) { ThreadExecutor::Stats SingleExecutor::getStats() { + Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - Stats stats(_maxPending, (accepted - _lastAccepted), 0); + Stats stats(_queueSize, (accepted - _lastAccepted), 0); _lastAccepted = accepted; - _maxPending = 0; + _queueSize = Stats::QueueSizeT() ; return stats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 9c3ebb4caf7..5beac5c1bec 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -52,7 +52,7 @@ private: std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; - std::atomic<uint64_t> _maxPending; + Stats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; diff --git a/vespalib/src/tests/executor/executor_test.cpp b/vespalib/src/tests/executor/executor_test.cpp index 9015391beaa..942b425be72 100644 --- a/vespalib/src/tests/executor/executor_test.cpp +++ b/vespalib/src/tests/executor/executor_test.cpp @@ -3,6 +3,7 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/vespalib/util/executor_stats.h> using namespace vespalib; @@ -24,4 +25,29 @@ TEST("require that lambdas can be wrapped as tasks") { EXPECT_TRUE(called); } +template<typename T> +void verify(const AggregatedAverage<T> & avg, size_t expCount, T expTotal, T expMin, T expMax, double expAvg) { + EXPECT_EQUAL(expCount, avg.count()); + EXPECT_EQUAL(expTotal, avg.total()); + EXPECT_EQUAL(expMin, avg.min()); + EXPECT_EQUAL(expMax, avg.max()); + EXPECT_EQUAL(expAvg, avg.average()); +} + +TEST("test that aggregated averages") { + TEST_DO(verify(AggregatedAverage<size_t>(), 0ul, 0ul, std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::min(), 0.0)); + AggregatedAverage<size_t> avg; + avg.add(9); + TEST_DO(verify(avg, 1ul, 9ul, 9ul, 9ul, 9.0)); + avg.add(8); + TEST_DO(verify(avg, 2ul, 17ul, 8ul, 9ul, 8.5)); + avg.add(3, 17, 4,17); + TEST_DO(verify(avg, 5ul, 34ul, 4ul, 17ul, 6.8)); + AggregatedAverage<size_t> avg2; + avg2.add(avg); + TEST_DO(verify(avg2, 5ul, 34ul, 4ul, 17ul, 6.8)); + avg2 += avg; + TEST_DO(verify(avg2, 10ul, 68ul, 4ul, 17ul, 6.8)); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp index 987b10526a3..9d69adcd96a 100644 --- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp +++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp @@ -78,10 +78,10 @@ struct MyState { EXPECT_EQUAL(expect_rejected, stats.rejectedTasks); EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0)); if (expect_deleted == 0) { - EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks); + EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); } stats = executor.getStats(); - EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks); + EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max()); EXPECT_EQUAL(0u, stats.acceptedTasks); EXPECT_EQUAL(0u, stats.rejectedTasks); return *this; @@ -187,12 +187,18 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor( } TEST("require that stats can be accumulated") { - ThreadStackExecutor::Stats stats(1,2,3); - EXPECT_EQUAL(1u, stats.maxPendingTasks); + ThreadStackExecutor::Stats stats(ThreadExecutor::Stats::QueueSizeT(1) ,2,3); + EXPECT_EQUAL(1u, stats.queueSize.max()); EXPECT_EQUAL(2u, stats.acceptedTasks); EXPECT_EQUAL(3u, stats.rejectedTasks); - stats += ThreadStackExecutor::Stats(7,8,9); - EXPECT_EQUAL(8u, stats.maxPendingTasks); + stats += ThreadStackExecutor::Stats(ThreadExecutor::Stats::QueueSizeT(7),8,9); + EXPECT_EQUAL(2u, stats.queueSize.count()); + EXPECT_EQUAL(8u, stats.queueSize.total()); + EXPECT_EQUAL(8u, stats.queueSize.max()); + EXPECT_EQUAL(8u, stats.queueSize.min()); + EXPECT_EQUAL(8u, stats.queueSize.max()); + EXPECT_EQUAL(4.0, stats.queueSize.average()); + EXPECT_EQUAL(10u, stats.acceptedTasks); EXPECT_EQUAL(12u, stats.rejectedTasks); diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h index 771435fbabf..9b941095c27 100644 --- a/vespalib/src/vespa/vespalib/util/executor_stats.h +++ b/vespalib/src/vespa/vespalib/util/executor_stats.h @@ -2,21 +2,69 @@ #pragma once +#include <limits> + namespace vespalib { /** + * Used for aggregating values, preserving min, max, sum and count. + */ +template <typename T> +class AggregatedAverage { +public: + AggregatedAverage() : AggregatedAverage(0ul, T(0), std::numeric_limits<T>::max(), std::numeric_limits<T>::min()) { } + explicit AggregatedAverage(T value) : AggregatedAverage(1, value, value, value) { } + AggregatedAverage(size_t count_in, T total_in, T min_in, T max_in) + : _count(count_in), + _total(total_in), + _min(min_in), + _max(max_in) + { } + AggregatedAverage & operator += (const AggregatedAverage & rhs) { + add(rhs); + return *this; + } + void add(const AggregatedAverage & rhs) { + add(rhs._count, rhs._total, rhs._min, rhs._max); + } + void add(T value) { + add(1, value, value, value); + } + void add(size_t count_in, T total_in, T min_in, T max_in) { + _count += count_in; + _total += total_in; + if (min_in < _min) _min = min_in; + if (max_in > _max) _max = max_in; + } + size_t count() const { return _count; } + T total() const { return _total; } + T min() const { return _min; } + T max() const { return _max; } + double average() const { return (_count > 0) ? (double(_total) / _count) : 0; } +private: + size_t _count; + T _total; + T _min; + T _max; +}; + +/** * Struct representing stats for an executor. **/ struct ExecutorStats { - size_t maxPendingTasks; + using QueueSizeT = AggregatedAverage<size_t>; + QueueSizeT queueSize; size_t acceptedTasks; size_t rejectedTasks; - ExecutorStats() : ExecutorStats(0, 0, 0) {} - ExecutorStats(size_t maxPending, size_t accepted, size_t rejected) - : maxPendingTasks(maxPending), acceptedTasks(accepted), rejectedTasks(rejected) + ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {} + ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected) + : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected) {} ExecutorStats & operator += (const ExecutorStats & rhs) { - maxPendingTasks += rhs.maxPendingTasks; + queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(), + queueSize.total() + rhs.queueSize.total(), + queueSize.min() + rhs.queueSize.min(), + queueSize.max() + rhs.queueSize.max()); acceptedTasks += rhs.acceptedTasks; rejectedTasks += rhs.rejectedTasks; return *this; diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 9ab465841d6..efb1dbf4054 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -196,7 +196,7 @@ ThreadStackExecutorBase::getStats() LockGuard lock(_monitor); Stats stats = _stats; _stats = Stats(); - _stats.maxPendingTasks = _taskCount; + _stats.queueSize.add(_taskCount); return stats; } @@ -208,8 +208,7 @@ ThreadStackExecutorBase::execute(Task::UP task) TaggedTask taggedTask(std::move(task), _barrier.startEvent()); ++_taskCount; ++_stats.acceptedTasks; - _stats.maxPendingTasks = (_taskCount > _stats.maxPendingTasks) - ?_taskCount : _stats.maxPendingTasks; + _stats.queueSize.add(_taskCount); if (!_workers.empty()) { Worker *worker = _workers.back(); _workers.popBack(); |