diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-26 21:46:34 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-03-26 21:46:34 +0100 |
commit | 8f0f88ff1b16d13371984c30ffc79d1887788113 (patch) | |
tree | 66caadf8364c121236dcc63ee87cfaf141b34d10 /staging_vespalib | |
parent | fa9c389e779851c563ccef77a2a37a7277c24e1c (diff) | |
parent | 36b3be76ae151682295faa04de17989f3600c29b (diff) |
Merge pull request #12727 from vespa-engine/balder/let-optimize-flag-control-index-and-summary-executor-too
Add shutdown to thread interface.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 33 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 3 |
2 files changed, 23 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 4b92d9a9687..90eb18c23ef 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -18,11 +18,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _queueSize(), _wakeupConsumerAt(0), _producerNeedWakeupAt(0), - _wp(0) + _wp(0), + _closed(false) { _thread.start(); } SingleExecutor::~SingleExecutor() { + shutdown(); sync(); _thread.stop().join(); } @@ -32,16 +34,6 @@ SingleExecutor::getNumThreads() const { return 1; } -uint64_t -SingleExecutor::addTask(Task::UP task) { - Lock guard(_mutex); - wait_for_room(guard); - uint64_t wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); - return wp; -} - void SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) { _producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed); @@ -51,7 +43,17 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup Executor::Task::UP SingleExecutor::execute(Task::UP task) { - uint64_t wp = addTask(std::move(task)); + uint64_t wp; + { + Lock guard(_mutex); + if (_closed) { + return task; + } + wait_for_room(guard); + wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); } @@ -88,6 +90,13 @@ SingleExecutor::sync() { return *this; } +SingleExecutor & +SingleExecutor::shutdown() { + Lock lock(_mutex); + _closed = true; + return *this; +} + void SingleExecutor::run() { while (!_thread.stopped()) { diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 5beac5c1bec..3d759769ea3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -27,9 +27,9 @@ public: size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; + SingleExecutor & shutdown() override; private: using Lock = std::unique_lock<std::mutex>; - uint64_t addTask(Task::UP task); void drain(Lock & lock); void run() override; void drain_tasks(); @@ -56,6 +56,7 @@ private: std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; + bool _closed; }; } |