summaryrefslogtreecommitdiffstats
path: root/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp16
1 files changed, 12 insertions, 4 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 90eb18c23ef..a17037799a3 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -6,6 +6,10 @@
namespace vespalib {
SingleExecutor::SingleExecutor(uint32_t taskLimit)
+ : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+{ }
+
+SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -19,10 +23,14 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wakeupConsumerAt(0),
_producerNeedWakeupAt(0),
_wp(0),
+ _watermark(std::min(_taskLimit.load(), watermark)),
+ _reactionTime(reactionTime),
_closed(false)
{
+ assert(taskLimit >= watermark);
_thread.start();
}
+
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
@@ -62,7 +70,7 @@ SingleExecutor::execute(Task::UP task) {
void
SingleExecutor::setTaskLimit(uint32_t taskLimit) {
- _wantedTaskLimit = vespalib::roundUp2inN(taskLimit);
+ _wantedTaskLimit = std::max(vespalib::roundUp2inN(taskLimit), size_t(_watermark));
}
void
@@ -102,10 +110,10 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_producerCondition.notify_all();
- _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) / 4), std::memory_order_relaxed);
+ _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + _watermark, std::memory_order_relaxed);
Lock lock(_mutex);
if (numTasks() <= 0) {
- _consumerCondition.wait_for(lock, 10ms);
+ _consumerCondition.wait_for(lock, _reactionTime);
}
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
@@ -144,7 +152,7 @@ SingleExecutor::wait_for_room(Lock & lock) {
}
_queueSize.add(numTasks());
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
- sleepProducer(lock, 10ms, wp - taskLimit/4);
+ sleepProducer(lock, _reactionTime, wp - _watermark);
}
}