aboutsummaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 11:40:45 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-03-25 11:40:45 +0000
commit44ec2d74b5e828b7d1f0bc97a315846a3d5051ac (patch)
tree330b9dcf6b741a5609647d6685ffe1e421901dae /staging_vespalib
parent668412faf939d3d3d185c6e7c81056c4cdb5afe3 (diff)
Unify the metrics for queuesize similar to what we have for the spi queues.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp12
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h2
2 files changed, 6 insertions, 8 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 73612452b09..4b92d9a9687 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -15,7 +15,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_producerCondition(),
_thread(*this),
_lastAccepted(0),
- _maxPending(0),
+ _queueSize(),
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0)
@@ -112,10 +112,6 @@ SingleExecutor::drain_tasks() {
void
SingleExecutor::run_tasks_till(uint64_t available) {
uint64_t consumed = _rp.load(std::memory_order_relaxed);
- uint64_t left = available - consumed;
- if (_maxPending.load(std::memory_order_relaxed) < left) {
- _maxPending.store(left, std::memory_order_relaxed);
- }
uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed);
while (consumed < available) {
Task::UP task = std::move(_tasks[index(consumed)]);
@@ -137,6 +133,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
_taskLimit = _wantedTaskLimit.load();
taskLimit = _taskLimit;
}
+ _queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
sleepProducer(lock, 10ms, wp - taskLimit/4);
}
@@ -144,10 +141,11 @@ SingleExecutor::wait_for_room(Lock & lock) {
ThreadExecutor::Stats
SingleExecutor::getStats() {
+ Lock lock(_mutex);
uint64_t accepted = _wp.load(std::memory_order_relaxed);
- Stats stats(_maxPending, (accepted - _lastAccepted), 0);
+ Stats stats(_queueSize, (accepted - _lastAccepted), 0);
_lastAccepted = accepted;
- _maxPending = 0;
+ _queueSize = Stats::QueueSizeT() ;
return stats;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 9c3ebb4caf7..5beac5c1bec 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -52,7 +52,7 @@ private:
std::condition_variable _producerCondition;
vespalib::Thread _thread;
uint64_t _lastAccepted;
- std::atomic<uint64_t> _maxPending;
+ Stats::QueueSizeT _queueSize;
std::atomic<uint64_t> _wakeupConsumerAt;
std::atomic<uint64_t> _producerNeedWakeupAt;
std::atomic<uint64_t> _wp;