diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 11:40:45 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-25 11:40:45 +0000 |
commit | 44ec2d74b5e828b7d1f0bc97a315846a3d5051ac (patch) | |
tree | 330b9dcf6b741a5609647d6685ffe1e421901dae /staging_vespalib | |
parent | 668412faf939d3d3d185c6e7c81056c4cdb5afe3 (diff) |
Unify the metrics for queuesize similar to what we have for the spi queues.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 12 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 2 |
2 files changed, 6 insertions, 8 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 73612452b09..4b92d9a9687 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -15,7 +15,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _producerCondition(), _thread(*this), _lastAccepted(0), - _maxPending(0), + _queueSize(), _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0) @@ -112,10 +112,6 @@ SingleExecutor::drain_tasks() { void SingleExecutor::run_tasks_till(uint64_t available) { uint64_t consumed = _rp.load(std::memory_order_relaxed); - uint64_t left = available - consumed; - if (_maxPending.load(std::memory_order_relaxed) < left) { - _maxPending.store(left, std::memory_order_relaxed); - } uint64_t wakeupLimit = _producerNeedWakeupAt.load(std::memory_order_relaxed); while (consumed < available) { Task::UP task = std::move(_tasks[index(consumed)]); @@ -137,6 +133,7 @@ SingleExecutor::wait_for_room(Lock & lock) { _taskLimit = _wantedTaskLimit.load(); taskLimit = _taskLimit; } + _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { sleepProducer(lock, 10ms, wp - taskLimit/4); } @@ -144,10 +141,11 @@ SingleExecutor::wait_for_room(Lock & lock) { ThreadExecutor::Stats SingleExecutor::getStats() { + Lock lock(_mutex); uint64_t accepted = _wp.load(std::memory_order_relaxed); - Stats stats(_maxPending, (accepted - _lastAccepted), 0); + Stats stats(_queueSize, (accepted - _lastAccepted), 0); _lastAccepted = accepted; - _maxPending = 0; + _queueSize = Stats::QueueSizeT() ; return stats; } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 9c3ebb4caf7..5beac5c1bec 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -52,7 +52,7 @@ private: std::condition_variable _producerCondition; vespalib::Thread _thread; uint64_t _lastAccepted; - std::atomic<uint64_t> _maxPending; + Stats::QueueSizeT _queueSize; std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; |