diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-28 14:36:40 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-28 14:36:40 +0000 |
commit | 1908db5022dad66bdf14ea3377a5459eac5534e7 (patch) | |
tree | 52546cb253a5a88edb5bb9f4c6dd0b63c5286d77 /staging_vespalib | |
parent | ee9d06da8b8d25216290fa4f52145d14035b04e7 (diff) |
Use proper memory_ordering
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 25 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 4 |
2 files changed, 16 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index a1766eda00b..bf1c41350ee 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -31,21 +31,24 @@ SingleExecutor::getNumThreads() const { return 1; } +uint64_t +SingleExecutor::addTask(Task::UP task) { + MonitorGuard guard(_producerMonitor); + wait_for_room(guard); + uint64_t wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + return wp; +} + Executor::Task::UP SingleExecutor::execute(Task::UP task) { - uint64_t wp; - { - MonitorGuard produceGuard(_producerMonitor); - wait_for_room(produceGuard); - wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_relaxed); - } + uint64_t wp = addTask(std::move(task)); if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { MonitorGuard guard(_consumerMonitor); guard.signal(); } - return Task::UP(); + return task; } void @@ -76,7 +79,7 @@ SingleExecutor::run() { void SingleExecutor::drain_tasks() { while (numTasks() > 0) { - run_tasks_till(_wp.load(std::memory_order_relaxed)); + run_tasks_till(_wp.load(std::memory_order_acquire)); } } @@ -96,7 +99,7 @@ SingleExecutor::run_tasks_till(uint64_t available) { _rp.store(++consumed, std::memory_order_relaxed); if (wakeupLimit == consumed) { MonitorGuard guard(_producerMonitor); - guard.signal(); + guard.broadcast(); } } } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 1f1f3169119..5bf7ef9fe6f 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -25,12 +25,12 @@ public: size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; - private: + uint64_t addTask(Task::UP task); void run() override; void drain_tasks(); void run_tasks_till(uint64_t available); - void wait_for_room(MonitorGuard & producerGuard); + void wait_for_room(MonitorGuard & guard); uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } |