summaryrefslogtreecommitdiffstats
path: root/staging_vespalib
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-28 14:36:40 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-28 14:36:40 +0000
commit1908db5022dad66bdf14ea3377a5459eac5534e7 (patch)
tree52546cb253a5a88edb5bb9f4c6dd0b63c5286d77 /staging_vespalib
parentee9d06da8b8d25216290fa4f52145d14035b04e7 (diff)
Use proper memory_ordering
Diffstat (limited to 'staging_vespalib')
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp25
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
2 files changed, 16 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index a1766eda00b..bf1c41350ee 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -31,21 +31,24 @@ SingleExecutor::getNumThreads() const {
return 1;
}
+uint64_t
+SingleExecutor::addTask(Task::UP task) {
+ MonitorGuard guard(_producerMonitor);
+ wait_for_room(guard);
+ uint64_t wp = _wp.load(std::memory_order_relaxed);
+ _tasks[index(wp)] = std::move(task);
+ _wp.store(wp + 1, std::memory_order_release);
+ return wp;
+}
+
Executor::Task::UP
SingleExecutor::execute(Task::UP task) {
- uint64_t wp;
- {
- MonitorGuard produceGuard(_producerMonitor);
- wait_for_room(produceGuard);
- wp = _wp.load(std::memory_order_relaxed);
- _tasks[index(wp)] = std::move(task);
- _wp.store(wp + 1, std::memory_order_relaxed);
- }
+ uint64_t wp = addTask(std::move(task));
if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) {
MonitorGuard guard(_consumerMonitor);
guard.signal();
}
- return Task::UP();
+ return task;
}
void
@@ -76,7 +79,7 @@ SingleExecutor::run() {
void
SingleExecutor::drain_tasks() {
while (numTasks() > 0) {
- run_tasks_till(_wp.load(std::memory_order_relaxed));
+ run_tasks_till(_wp.load(std::memory_order_acquire));
}
}
@@ -96,7 +99,7 @@ SingleExecutor::run_tasks_till(uint64_t available) {
_rp.store(++consumed, std::memory_order_relaxed);
if (wakeupLimit == consumed) {
MonitorGuard guard(_producerMonitor);
- guard.signal();
+ guard.broadcast();
}
}
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 1f1f3169119..5bf7ef9fe6f 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -25,12 +25,12 @@ public:
size_t getNumThreads() const override;
uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); }
Stats getStats() override;
-
private:
+ uint64_t addTask(Task::UP task);
void run() override;
void drain_tasks();
void run_tasks_till(uint64_t available);
- void wait_for_room(MonitorGuard & producerGuard);
+ void wait_for_room(MonitorGuard & guard);
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}