diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-15 13:10:31 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-02-15 15:27:19 +0000 |
commit | 070fc34cee07db023824c76995bba43f2262d6c1 (patch) | |
tree | f30d42ff4c97cafcb3dd64e025c7f687366784c4 /vespalib | |
parent | 5780a48616db40c6eb5ae12293b115fbbc44b080 (diff) |
use std::thread directly
also add very simple ThreadPool class to run multiple threads at once
make an effort to only join once
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/thread/thread_test.cpp | 39 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/testkit/test_hook.h | 6 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp | 2 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/simple_thread_bundle.h | 3 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 2 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/singleexecutor.h | 2 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/thread.cpp | 32 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/thread.h | 48 |
8 files changed, 58 insertions, 76 deletions
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index f7e8753fd86..9533fe1c190 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -19,30 +19,33 @@ void my_fun(bool *was_run) { *was_run = true; } +Runnable::init_fun_t wrap(Runnable::init_fun_t init, bool *init_called) { + return [=](Runnable &target) + { + *init_called = true; + return init(target); + }; +} + TEST("run vespalib::Runnable with init function") { Agent agent; - { - auto thread = Thread::start(agent, test_agent_thread); - } + bool init_called = false; + auto thread = thread::start(agent, wrap(test_agent_thread, &init_called)); + thread.join(); + EXPECT_TRUE(init_called); EXPECT_TRUE(agent.was_run); } -TEST("run custom function") { - bool was_run = false; - { - auto thread = Thread::start(my_fun, &was_run); - } - EXPECT_TRUE(was_run); -} - -TEST("join multiple times (including destructor)") { +TEST("use thread pool to run multiple things") { + Agent agent; + bool init_called = false; bool was_run = false; - { - auto thread = Thread::start(my_fun, &was_run); - thread.join(); - thread.join(); - thread.join(); - } + ThreadPool pool; + pool.start(my_fun, &was_run); + pool.start(agent, wrap(test_agent_thread, &init_called)); + pool.join(); + EXPECT_TRUE(init_called); + EXPECT_TRUE(agent.was_run); EXPECT_TRUE(was_run); } diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h index 8a5c6c1e684..0ff5ba11709 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.h +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h @@ -78,10 +78,9 @@ protected: Barrier barrier(num_threads); std::vector<FixtureUP> fixtures; std::vector<ThreadUP> threads; - std::vector<Thread> thread_handles; + ThreadPool pool; threads.reserve(num_threads); fixtures.reserve(num_threads); - thread_handles.reserve(num_threads - 1); for (size_t i = 0; i < num_threads; ++i) { FixtureUP fixture_up(new T(fixture)); fixture_up->thread_id = i; @@ -90,10 +89,11 @@ protected: fixtures.push_back(std::move(fixture_up)); } for (size_t i = 1; i < num_threads; ++i) { - thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); })); + pool.start([&target = *threads[i]](){ target.threadEntry(); }); } threads[0]->threadEntry(); latch.await(); + pool.join(); bool result = true; for (size_t i = 0; i < num_threads; ++i) { result = result && threads[i]->getResult(); diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index becb5d2ab74..958bb58f34a 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -182,7 +182,7 @@ SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Run signal(s), hook(std::move(h)) { - thread = Thread::start(*this, std::move(init_fun)); + thread = thread::start(*this, std::move(init_fun)); } void diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index b5f371d2de3..536474d9300 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -113,7 +113,7 @@ public: private: struct Worker : Runnable { using UP = std::unique_ptr<Worker>; - Thread thread; + std::thread thread; Signal &signal; Runnable::UP hook; Worker(Signal &s, init_fun_t init_fun, Runnable::UP h); @@ -136,4 +136,3 @@ public: }; } // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 298226d8805..fa1aef61167 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -38,7 +38,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool if ( ! isQueueSizeHard) { _overflow = std::make_unique<ArrayQueue<Task::UP>>(); } - _thread = Thread::start(*this, func); + _thread = thread::start(*this, func); } SingleExecutor::~SingleExecutor() { diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h index 051f506e90a..e8b7b0c44ad 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -70,7 +70,7 @@ private: std::mutex _mutex; std::condition_variable _consumerCondition; std::condition_variable _producerCondition; - vespalib::Thread _thread; + std::thread _thread; std::atomic<bool> _stopped; ExecutorIdleTracker _idleTracker; ThreadIdleTracker _threadIdleTracker; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index cad24c5bcda..1c8fef53ba3 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -2,33 +2,13 @@ #include "thread.h" -namespace vespalib { +namespace vespalib::thread { -Thread & -Thread::operator=(Thread &&rhs) noexcept -{ - // may call std::terminate - _thread = std::move(rhs._thread); - return *this; +std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun_in) { + return std::thread([&runnable, init_fun = std::move(init_fun_in)]() + { + init_fun(runnable); + }); } -void -Thread::join() -{ - if (_thread.joinable()) { - _thread.join(); - } } - -Thread::~Thread() -{ - join(); -} - -Thread -Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun) -{ - return start([&runnable, init_fun](){ init_fun(runnable); }); -} - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index 6db1c1d54b4..d919b8f2ab7 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -8,36 +8,36 @@ namespace vespalib { +namespace thread { +[[nodiscard]] std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun); +} + /** - * Thin thread abstraction that takes some things from std::thread - * (not allowed to assign to a running thread), some things from - * std::jthread (destructor does automatic join) and some things from - * now deprecated thread pools (the join function can be called - * multiple times and will only join the underlying thread if it is - * joinable). Enables starting a thread either by using a runnable and - * an init function or by forwarding directly to the std::thread - * constructor. Note that this class does not handle cancellation. + * Keeps track of multiple running threads. Calling join will join all + * currently running threads. All threads must be joined before + * destructing the pool itself. This class is not thread safe. **/ -class Thread -{ +class ThreadPool { private: - std::thread _thread; - Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {} + std::vector<std::thread> _threads; public: - Thread() noexcept : _thread() {} - Thread(const Thread &rhs) = delete; - Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {} - std::thread::id get_id() const noexcept { return _thread.get_id(); } - Thread &operator=(const Thread &rhs) = delete; - Thread &operator=(Thread &&rhs) noexcept; - void join(); - ~Thread(); - [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t); + ThreadPool() noexcept : _threads() {} + void start(Runnable &runnable, Runnable::init_fun_t init_fun) { + _threads.reserve(_threads.size() + 1); + _threads.push_back(thread::start(runnable, std::move(init_fun))); + } template<typename F, typename... Args> requires std::invocable<F,Args...> - [[nodiscard]] static Thread start(F &&f, Args && ... args) { - return Thread(std::thread(std::forward<F>(f), std::forward<Args>(args)...)); + void start(F &&f, Args && ... args) { + _threads.reserve(_threads.size() + 1); + _threads.emplace_back(std::forward<F>(f), std::forward<Args>(args)...); }; + void join() { + for (auto &thread: _threads) { + thread.join(); + } + _threads.clear(); + } }; -} // namespace vespalib +} |