diff options
Diffstat (limited to 'staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 90eb18c23ef..a17037799a3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -6,6 +6,10 @@ namespace vespalib { SingleExecutor::SingleExecutor(uint32_t taskLimit) + : SingleExecutor(taskLimit, taskLimit/10, 5ms) +{ } + +SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _wakeupConsumerAt(0), _producerNeedWakeupAt(0), _wp(0), + _watermark(std::min(_taskLimit.load(), watermark)), + _reactionTime(reactionTime), _closed(false) { + assert(taskLimit >= watermark); _thread.start(); } + SingleExecutor::~SingleExecutor() { shutdown(); sync(); @@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) { void SingleExecutor::setTaskLimit(uint32_t taskLimit) { - _wantedTaskLimit = vespalib::roundUp2inN(taskLimit); + _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark)); } void @@ -102,10 +110,10 @@ SingleExecutor::run() { while (!_thread.stopped()) { drain_tasks(); _producerCondition.notify_all(); - _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed); + _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed); Lock lock(_mutex); if (numTasks() <= 0) { - _consumerCondition.wait_for(lock, 10ms); + _consumerCondition.wait_for(lock, _reactionTime); } _wakeupConsumerAt.store(0, std::memory_order_relaxed); } @@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) { } _queueSize.add(numTasks()); while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) { - sleepProducer(lock, 10ms, wp - taskLimit/4); + sleepProducer(lock, _reactionTime, wp - _watermark); } } |