diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-23 16:01:22 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-03-23 16:13:09 +0000 |
commit | 36b3be76ae151682295faa04de17989f3600c29b (patch) | |
tree | fbe5156d396bfa694f9acd932b466088c20d2e63 /staging_vespalib | |
parent | 23dcf1db150a66ec66cf746d4234982fdbb0e6e2 (diff) |
Add shutdown to thread interface.
Let the optimize config control index and summary executor too.
Diffstat (limited to 'staging_vespalib')
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 33 | ||||
-rw-r--r-- | staging_vespalib/src/vespa/vespalib/util/singleexecutor.h | 3 |
2 files changed, 23 insertions, 13 deletions
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 73612452b09..2d0ffc721e6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -18,11 +18,13 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit) _maxPending(0), _wakeupConsumerAt(0), _producerNeedWakeupAt(0), - _wp(0) + _wp(0), + _closed(false) { _thread.start(); } SingleExecutor::~SingleExecutor() { + shutdown(); sync(); _thread.stop().join(); } @@ -32,16 +34,6 @@ SingleExecutor::getNumThreads() const { return 1; } -uint64_t -SingleExecutor::addTask(Task::UP task) { - Lock guard(_mutex); - wait_for_room(guard); - uint64_t wp = _wp.load(std::memory_order_relaxed); - _tasks[index(wp)] = std::move(task); - _wp.store(wp + 1, std::memory_order_release); - return wp; -} - void SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeupAt) { _producerNeedWakeupAt.store(wakeupAt, std::memory_order_relaxed); @@ -51,7 +43,17 @@ SingleExecutor::sleepProducer(Lock & lock, duration maxWaitTime, uint64_t wakeup Executor::Task::UP SingleExecutor::execute(Task::UP task) { - uint64_t wp = addTask(std::move(task)); + uint64_t wp; + { + Lock guard(_mutex); + if (_closed) { + return task; + } + wait_for_room(guard); + wp = _wp.load(std::memory_order_relaxed); + _tasks[index(wp)] = std::move(task); + _wp.store(wp + 1, std::memory_order_release); + } if (wp == _wakeupConsumerAt.load(std::memory_order_relaxed)) { _consumerCondition.notify_one(); } @@ -88,6 +90,13 @@ SingleExecutor::sync() { return *this; } +SingleExecutor & +SingleExecutor::shutdown() { + Lock lock(_mutex); + _closed = true; + return *this; +} + void SingleExecutor::run() { while (!_thread.stopped()) { diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 9c3ebb4caf7..1b9b4f3afc4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -27,9 +27,9 @@ public: size_t getNumThreads() const override; uint32_t getTaskLimit() const { return _taskLimit.load(std::memory_order_relaxed); } Stats getStats() override; + SingleExecutor & shutdown() override; private: using Lock = std::unique_lock<std::mutex>; - uint64_t addTask(Task::UP task); void drain(Lock & lock); void run() override; void drain_tasks(); @@ -56,6 +56,7 @@ private: std::atomic<uint64_t> _wakeupConsumerAt; std::atomic<uint64_t> _producerNeedWakeupAt; std::atomic<uint64_t> _wp; + bool _closed; }; } |