summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 11:40:45 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 11:40:45 +0000
commit44ec2d74b5e828b7d1f0bc97a315846a3d5051ac (patch)
tree330b9dcf6b741a5609647d6685ffe1e421901dae
parent668412faf939d3d3d185c6e7c81056c4cdb5afe3 (diff)
Unify the metrics for queuesize similar to what we have for the spi queues.
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp7
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h3
-rw-r--r--searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp2
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp12
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h2
-rw-r--r--vespalib/src/tests/executor/executor_test.cpp26
-rw-r--r--vespalib/src/tests/executor/threadstackexecutor_test.cpp18
-rw-r--r--vespalib/src/vespa/vespalib/util/executor_stats.h58
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp5
9 files changed, 107 insertions, 26 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
index 710c072aa53..f4e9bff1b4b 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.cpp
@@ -7,16 +7,19 @@ namespace proton {
void
ExecutorMetrics::update(const vespalib::ThreadStackExecutorBase::Stats &stats)
{
- maxPending.set(stats.maxPendingTasks);
+ maxPending.set(stats.queueSize.max());
accepted.inc(stats.acceptedTasks);
rejected.inc(stats.rejectedTasks);
+ const vespalib::ThreadStackExecutorBase::Stats::QueueSizeT & qSize = stats.queueSize;
+ queueSize.addValueBatch(qSize.average(), qSize.count(), qSize.min(), qSize.max());
}
ExecutorMetrics::ExecutorMetrics(const std::string &name, metrics::MetricSet *parent)
: metrics::MetricSet(name, {}, "Instance specific thread executor metrics", parent),
maxPending("maxpending", {}, "Maximum number of pending (active + queued) tasks", this),
accepted("accepted", {}, "Number of accepted tasks", this),
- rejected("rejected", {}, "Number of rejected tasks", this)
+ rejected("rejected", {}, "Number of rejected tasks", this),
+ queueSize("queuesize", {}, "size of task queue", this)
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
index a347edffd4b..6b638391d1e 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_metrics.h
@@ -11,9 +11,10 @@ namespace proton {
struct ExecutorMetrics : metrics::MetricSet
{
- metrics::LongValueMetric maxPending;
+ metrics::LongValueMetric maxPending; // TODO Remove on Vespa 8 or sooner if possible.
metrics::LongCountMetric accepted;
metrics::LongCountMetric rejected;
+ metrics::LongAverageMetric queueSize;
void update(const vespalib::ThreadStackExecutorBase::Stats &stats);
ExecutorMetrics(const std::string &name, metrics::MetricSet *parent);
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
index 4c501defeea..a93eb1ff4bc 100644
--- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
+++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp
@@ -39,7 +39,7 @@ void ForegroundTaskExecutor::setTaskLimit(uint32_t) {
}
vespalib::ExecutorStats ForegroundTaskExecutor::getStats() {
- return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0);
+ return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0);
}
} // namespace search
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 73612452b09..4b92d9a9687 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -15,7 +15,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_producerCondition(),
_thread(*this),
_lastAccepted(0),
- _maxPending(0),
+ _queueSize(),
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0)
@@ -112,10 +112,6 @@ SingleExecutor::drain_tasks() {
void
SingleExecutor::run_tasks_till(uint64_t available) {
uint64_t consumed = _rp.load(std::memory_order_relaxed);
- uint64_t left = available - consumed;
- if (_maxPending.load(std::memory_order_relaxed) < left) {
- _maxPending.store(left, std::memory_order_relaxed);
- }
uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed);
while (consumed < available) {
Task::UP task = std::move(_tasks[index(consumed)]);
@@ -137,6 +133,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
_taskLimit = _wantedTaskLimit.load();
taskLimit = _taskLimit;
}
+ _queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
sleepProducer(lock, 10ms, wp - taskLimit/4);
}
@@ -144,10 +141,11 @@ SingleExecutor::wait_for_room(Lock & lock) {
ThreadExecutor::Stats
SingleExecutor::getStats() {
+ Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- Stats stats(_maxPending, (accepted - _lastAccepted), 0);
+ Stats stats(_queueSize, (accepted - _lastAccepted), 0);
_lastAccepted = accepted;
- _maxPending = 0;
+ _queueSize = Stats::QueueSizeT() ;
return stats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 9c3ebb4caf7..5beac5c1bec 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -52,7 +52,7 @@ private:
std::condition_variable _producerCondition;
vespalib::Thread _thread;
uint64_t _lastAccepted;
- std::atomic<uint64_t> _maxPending;
+ Stats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;
diff --git a/vespalib/src/tests/executor/executor_test.cpp b/vespalib/src/tests/executor/executor_test.cpp
index 9015391beaa..942b425be72 100644
--- a/vespalib/src/tests/executor/executor_test.cpp
+++ b/vespalib/src/tests/executor/executor_test.cpp
@@ -3,6 +3,7 @@
#include <vespa/vespalib/testkit/testapp.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/util/lambdatask.h>
+#include <vespa/vespalib/util/executor_stats.h>
using namespace vespalib;
@@ -24,4 +25,29 @@ TEST("require that lambdas can be wrapped as tasks") {
EXPECT_TRUE(called);
}
+template<typename T>
+void verify(const AggregatedAverage<T> & avg, size_t expCount, T expTotal, T expMin, T expMax, double expAvg) {
+ EXPECT_EQUAL(expCount, avg.count());
+ EXPECT_EQUAL(expTotal, avg.total());
+ EXPECT_EQUAL(expMin, avg.min());
+ EXPECT_EQUAL(expMax, avg.max());
+ EXPECT_EQUAL(expAvg, avg.average());
+}
+
+TEST("test that aggregated averages") {
+ TEST_DO(verify(AggregatedAverage<size_t>(), 0ul, 0ul, std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::min(), 0.0));
+ AggregatedAverage<size_t> avg;
+ avg.add(9);
+ TEST_DO(verify(avg, 1ul, 9ul, 9ul, 9ul, 9.0));
+ avg.add(8);
+ TEST_DO(verify(avg, 2ul, 17ul, 8ul, 9ul, 8.5));
+ avg.add(3, 17, 4,17);
+ TEST_DO(verify(avg, 5ul, 34ul, 4ul, 17ul, 6.8));
+ AggregatedAverage<size_t> avg2;
+ avg2.add(avg);
+ TEST_DO(verify(avg2, 5ul, 34ul, 4ul, 17ul, 6.8));
+ avg2 += avg;
+ TEST_DO(verify(avg2, 10ul, 68ul, 4ul, 17ul, 6.8));
+}
+
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/tests/executor/threadstackexecutor_test.cpp b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
index 987b10526a3..9d69adcd96a 100644
--- a/vespalib/src/tests/executor/threadstackexecutor_test.cpp
+++ b/vespalib/src/tests/executor/threadstackexecutor_test.cpp
@@ -78,10 +78,10 @@ struct MyState {
EXPECT_EQUAL(expect_rejected, stats.rejectedTasks);
EXPECT_TRUE(!(gate.getCount() == 1) || (expect_deleted == 0));
if (expect_deleted == 0) {
- EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
}
stats = executor.getStats();
- EXPECT_EQUAL(expect_queue + expect_running, stats.maxPendingTasks);
+ EXPECT_EQUAL(expect_queue + expect_running, stats.queueSize.max());
EXPECT_EQUAL(0u, stats.acceptedTasks);
EXPECT_EQUAL(0u, stats.rejectedTasks);
return *this;
@@ -187,12 +187,18 @@ TEST_F("require that executor thread stack tag can be set", ThreadStackExecutor(
}
TEST("require that stats can be accumulated") {
- ThreadStackExecutor::Stats stats(1,2,3);
- EXPECT_EQUAL(1u, stats.maxPendingTasks);
+ ThreadStackExecutor::Stats stats(ThreadExecutor::Stats::QueueSizeT(1) ,2,3);
+ EXPECT_EQUAL(1u, stats.queueSize.max());
EXPECT_EQUAL(2u, stats.acceptedTasks);
EXPECT_EQUAL(3u, stats.rejectedTasks);
- stats += ThreadStackExecutor::Stats(7,8,9);
- EXPECT_EQUAL(8u, stats.maxPendingTasks);
+ stats += ThreadStackExecutor::Stats(ThreadExecutor::Stats::QueueSizeT(7),8,9);
+ EXPECT_EQUAL(2u, stats.queueSize.count());
+ EXPECT_EQUAL(8u, stats.queueSize.total());
+ EXPECT_EQUAL(8u, stats.queueSize.max());
+ EXPECT_EQUAL(8u, stats.queueSize.min());
+ EXPECT_EQUAL(8u, stats.queueSize.max());
+ EXPECT_EQUAL(4.0, stats.queueSize.average());
+
EXPECT_EQUAL(10u, stats.acceptedTasks);
EXPECT_EQUAL(12u, stats.rejectedTasks);
diff --git a/vespalib/src/vespa/vespalib/util/executor_stats.h b/vespalib/src/vespa/vespalib/util/executor_stats.h
index 771435fbabf..9b941095c27 100644
--- a/vespalib/src/vespa/vespalib/util/executor_stats.h
+++ b/vespalib/src/vespa/vespalib/util/executor_stats.h
@@ -2,21 +2,69 @@
#pragma once
+#include <limits>
+
namespace vespalib {
/**
+ * Used for aggregating values, preserving min, max, sum and count.
+ */
+template <typename T>
+class AggregatedAverage {
+public:
+ AggregatedAverage() : AggregatedAverage(0ul, T(0), std::numeric_limits<T>::max(), std::numeric_limits<T>::min()) { }
+ explicit AggregatedAverage(T value) : AggregatedAverage(1, value, value, value) { }
+ AggregatedAverage(size_t count_in, T total_in, T min_in, T max_in)
+ : _count(count_in),
+ _total(total_in),
+ _min(min_in),
+ _max(max_in)
+ { }
+ AggregatedAverage & operator += (const AggregatedAverage & rhs) {
+ add(rhs);
+ return *this;
+ }
+ void add(const AggregatedAverage & rhs) {
+ add(rhs._count, rhs._total, rhs._min, rhs._max);
+ }
+ void add(T value) {
+ add(1, value, value, value);
+ }
+ void add(size_t count_in, T total_in, T min_in, T max_in) {
+ _count += count_in;
+ _total += total_in;
+ if (min_in < _min) _min = min_in;
+ if (max_in > _max) _max = max_in;
+ }
+ size_t count() const { return _count; }
+ T total() const { return _total; }
+ T min() const { return _min; }
+ T max() const { return _max; }
+ double average() const { return (_count > 0) ? (double(_total) / _count) : 0; }
+private:
+ size_t _count;
+ T _total;
+ T _min;
+ T _max;
+};
+
+/**
* Struct representing stats for an executor.
**/
struct ExecutorStats {
- size_t maxPendingTasks;
+ using QueueSizeT = AggregatedAverage<size_t>;
+ QueueSizeT queueSize;
size_t acceptedTasks;
size_t rejectedTasks;
- ExecutorStats() : ExecutorStats(0, 0, 0) {}
- ExecutorStats(size_t maxPending, size_t accepted, size_t rejected)
- : maxPendingTasks(maxPending), acceptedTasks(accepted), rejectedTasks(rejected)
+ ExecutorStats() : ExecutorStats(QueueSizeT(), 0, 0) {}
+ ExecutorStats(QueueSizeT queueSize_in, size_t accepted, size_t rejected)
+ : queueSize(queueSize_in), acceptedTasks(accepted), rejectedTasks(rejected)
{}
ExecutorStats & operator += (const ExecutorStats & rhs) {
- maxPendingTasks += rhs.maxPendingTasks;
+ queueSize = QueueSizeT(queueSize.count() + rhs.queueSize.count(),
+ queueSize.total() + rhs.queueSize.total(),
+ queueSize.min() + rhs.queueSize.min(),
+ queueSize.max() + rhs.queueSize.max());
acceptedTasks += rhs.acceptedTasks;
rejectedTasks += rhs.rejectedTasks;
return *this;
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
index 9ab465841d6..efb1dbf4054 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp
@@ -196,7 +196,7 @@ ThreadStackExecutorBase::getStats()
LockGuard lock(_monitor);
Stats stats = _stats;
_stats = Stats();
- _stats.maxPendingTasks = _taskCount;
+ _stats.queueSize.add(_taskCount);
return stats;
}
@@ -208,8 +208,7 @@ ThreadStackExecutorBase::execute(Task::UP task)
TaggedTask taggedTask(std::move(task), _barrier.startEvent());
++_taskCount;
++_stats.acceptedTasks;
- _stats.maxPendingTasks = (_taskCount > _stats.maxPendingTasks)
- ?_taskCount : _stats.maxPendingTasks;
+ _stats.queueSize.add(_taskCount);
if (!_workers.empty()) {
Worker *worker = _workers.back();
_workers.popBack();