summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-28 11:27:50 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-28 11:27:50 +0000
commitee9d06da8b8d25216290fa4f52145d14035b04e7 (patch)
tree6935fb7249df704c90bcd0b22a0e63cf1bd91fb6 /staging_vespalib
parent821cbc57fcd370ae067344c329c09d1deb1700f0 (diff)
Thread safe on put.
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp28
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h10
2 files changed, 21 insertions, 17 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 59cc9a39957..a1766eda00b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -10,7 +10,8 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit)
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
_tasks(std::make_unique<Task::UP[]>(_taskLimit)),
- _monitor(),
+ _consumerMonitor(),
+ _producerMonitor(),
_thread(*this),
_lastAccepted(0),
_maxPending(0),
@@ -32,12 +33,16 @@ SingleExecutor::getNumThreads() const {
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
- wait_for_room();
- uint64_t wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_relaxed);
+ uint64_t wp;
+ {
+ MonitorGuard produceGuard(_producerMonitor);
+ wait_for_room(produceGuard);
+ wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_relaxed);
+ }
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
- MonitorGuard guard(_monitor);
+ MonitorGuard guard(_consumerMonitor);
guard.signal();
}
return Task::UP();
@@ -62,8 +67,8 @@ SingleExecutor::run() {
while (!_thread.stopped()) {
drain_tasks();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + (_taskLimit.load(std::memory_order_relaxed) >> 2), std::memory_order_relaxed);
- MonitorGuard guard(_monitor);
- guard.wait(1ms);
+ MonitorGuard guard(_consumerMonitor);
+ guard.wait(10ms);
_wakeupConsumerAt.store(0, std::memory_order_relaxed);
}
}
@@ -90,14 +95,14 @@ SingleExecutor::run_tasks_till(uint64_t available) {
task->run();
_rp.store(++consumed, std::memory_order_relaxed);
if (wakeupLimit == consumed) {
- MonitorGuard guard(_monitor);
+ MonitorGuard guard(_producerMonitor);
guard.signal();
}
}
}
void
-SingleExecutor::wait_for_room() {
+SingleExecutor::wait_for_room(MonitorGuard & producerGuard) {
if (_taskLimit.load(std::memory_order_relaxed) != _wantedTaskLimit.load(std::memory_order_relaxed)) {
sync();
_tasks = std::make_unique<Task::UP[]>(_wantedTaskLimit);
@@ -105,8 +110,7 @@ SingleExecutor::wait_for_room() {
}
while (numTasks() >= _taskLimit.load(std::memory_order_relaxed)) {
_producerNeedWakeup.store(true, std::memory_order_relaxed);
- MonitorGuard guard(_monitor);
- guard.wait(1ms);
+ producerGuard.wait(10ms);
_producerNeedWakeup.store(false, std::memory_order_relaxed);
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 13520efa3d8..1f1f3169119 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -12,9 +12,8 @@ namespace vespalib {
/**
* Has a single thread consuming tasks from a fixed size ringbuffer.
* Made for throughput where the producer has no interaction with the consumer and
- * it is hence very cheap to produce a task. The consumer wakes up once every ms to see if there are
- * anything to consumer. It uses a lock free ringbuffer, but requires only a single producer.
- * That must be enforced on the outside.
+ * it is hence very cheap to produce a task. High and low watermark at 25%/75% is used
+ * to reduce ping-pong.
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
@@ -31,7 +30,7 @@ private:
void run() override;
void drain_tasks();
void run_tasks_till(uint64_t available);
- void wait_for_room();
+ void wait_for_room(MonitorGuard & producerGuard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
@@ -43,7 +42,8 @@ private:
std::atomic<uint32_t> _wantedTaskLimit;
std::atomic<uint64_t> _rp;
std::unique_ptr<Task::UP[]> _tasks;
- vespalib::Monitor _monitor;
+ vespalib::Monitor _consumerMonitor;
+ vespalib::Monitor _producerMonitor;
vespalib::Thread _thread;
uint64_t _lastAccepted;
std::atomic<uint64_t> _maxPending;