aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-02-15 13:10:31 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-02-15 15:27:19 +0000
commit070fc34cee07db023824c76995bba43f2262d6c1 (patch)
treef30d42ff4c97cafcb3dd64e025c7f687366784c4 /vespalib
parent5780a48616db40c6eb5ae12293b115fbbc44b080 (diff)
use std::thread directly
also add very simple ThreadPool class to run multiple threads at once make an effort to only join once
Diffstat (limited to 'vespalib')
-rw-r--r--vespalib/src/tests/thread/thread_test.cpp39
-rw-r--r--vespalib/src/vespa/vespalib/testkit/test_hook.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.h3
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.h2
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.cpp32
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h48
8 files changed, 58 insertions, 76 deletions
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp
index f7e8753fd86..9533fe1c190 100644
--- a/vespalib/src/tests/thread/thread_test.cpp
+++ b/vespalib/src/tests/thread/thread_test.cpp
@@ -19,30 +19,33 @@ void my_fun(bool *was_run) {
*was_run = true;
}
+Runnable::init_fun_t wrap(Runnable::init_fun_t init, bool *init_called) {
+ return [=](Runnable &target)
+ {
+ *init_called = true;
+ return init(target);
+ };
+}
+
TEST("run vespalib::Runnable with init function") {
Agent agent;
- {
- auto thread = Thread::start(agent, test_agent_thread);
- }
+ bool init_called = false;
+ auto thread = thread::start(agent, wrap(test_agent_thread, &init_called));
+ thread.join();
+ EXPECT_TRUE(init_called);
EXPECT_TRUE(agent.was_run);
}
-TEST("run custom function") {
- bool was_run = false;
- {
- auto thread = Thread::start(my_fun, &was_run);
- }
- EXPECT_TRUE(was_run);
-}
-
-TEST("join multiple times (including destructor)") {
+TEST("use thread pool to run multiple things") {
+ Agent agent;
+ bool init_called = false;
bool was_run = false;
- {
- auto thread = Thread::start(my_fun, &was_run);
- thread.join();
- thread.join();
- thread.join();
- }
+ ThreadPool pool;
+ pool.start(my_fun, &was_run);
+ pool.start(agent, wrap(test_agent_thread, &init_called));
+ pool.join();
+ EXPECT_TRUE(init_called);
+ EXPECT_TRUE(agent.was_run);
EXPECT_TRUE(was_run);
}
diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h
index 8a5c6c1e684..0ff5ba11709 100644
--- a/vespalib/src/vespa/vespalib/testkit/test_hook.h
+++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h
@@ -78,10 +78,9 @@ protected:
Barrier barrier(num_threads);
std::vector<FixtureUP> fixtures;
std::vector<ThreadUP> threads;
- std::vector<Thread> thread_handles;
+ ThreadPool pool;
threads.reserve(num_threads);
fixtures.reserve(num_threads);
- thread_handles.reserve(num_threads - 1);
for (size_t i = 0; i < num_threads; ++i) {
FixtureUP fixture_up(new T(fixture));
fixture_up->thread_id = i;
@@ -90,10 +89,11 @@ protected:
fixtures.push_back(std::move(fixture_up));
}
for (size_t i = 1; i < num_threads; ++i) {
- thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); }));
+ pool.start([&target = *threads[i]](){ target.threadEntry(); });
}
threads[0]->threadEntry();
latch.await();
+ pool.join();
bool result = true;
for (size_t i = 0; i < num_threads; ++i) {
result = result && threads[i]->getResult();
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index becb5d2ab74..958bb58f34a 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -182,7 +182,7 @@ SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Run
signal(s),
hook(std::move(h))
{
- thread = Thread::start(*this, std::move(init_fun));
+ thread = thread::start(*this, std::move(init_fun));
}
void
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
index b5f371d2de3..536474d9300 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h
@@ -113,7 +113,7 @@ public:
private:
struct Worker : Runnable {
using UP = std::unique_ptr<Worker>;
- Thread thread;
+ std::thread thread;
Signal &signal;
Runnable::UP hook;
Worker(Signal &s, init_fun_t init_fun, Runnable::UP h);
@@ -136,4 +136,3 @@ public:
};
} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 298226d8805..fa1aef61167 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -38,7 +38,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool
if ( ! isQueueSizeHard) {
_overflow = std::make_unique<ArrayQueue<Task::UP>>();
}
- _thread = Thread::start(*this, func);
+ _thread = thread::start(*this, func);
}
SingleExecutor::~SingleExecutor() {
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h
index 051f506e90a..e8b7b0c44ad 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -70,7 +70,7 @@ private:
std::mutex _mutex;
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
- vespalib::Thread _thread;
+ std::thread _thread;
std::atomic<bool> _stopped;
ExecutorIdleTracker _idleTracker;
ThreadIdleTracker _threadIdleTracker;
diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp
index cad24c5bcda..1c8fef53ba3 100644
--- a/vespalib/src/vespa/vespalib/util/thread.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread.cpp
@@ -2,33 +2,13 @@
#include "thread.h"
-namespace vespalib {
+namespace vespalib::thread {
-Thread &
-Thread::operator=(Thread &&rhs) noexcept
-{
- // may call std::terminate
- _thread = std::move(rhs._thread);
- return *this;
+std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun_in) {
+ return std::thread([&runnable, init_fun = std::move(init_fun_in)]()
+ {
+ init_fun(runnable);
+ });
}
-void
-Thread::join()
-{
- if (_thread.joinable()) {
- _thread.join();
- }
}
-
-Thread::~Thread()
-{
- join();
-}
-
-Thread
-Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun)
-{
- return start([&runnable, init_fun](){ init_fun(runnable); });
-}
-
-} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index 6db1c1d54b4..d919b8f2ab7 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -8,36 +8,36 @@
namespace vespalib {
+namespace thread {
+[[nodiscard]] std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun);
+}
+
/**
- * Thin thread abstraction that takes some things from std::thread
- * (not allowed to assign to a running thread), some things from
- * std::jthread (destructor does automatic join) and some things from
- * now deprecated thread pools (the join function can be called
- * multiple times and will only join the underlying thread if it is
- * joinable). Enables starting a thread either by using a runnable and
- * an init function or by forwarding directly to the std::thread
- * constructor. Note that this class does not handle cancellation.
+ * Keeps track of multiple running threads. Calling join will join all
+ * currently running threads. All threads must be joined before
+ * destructing the pool itself. This class is not thread safe.
**/
-class Thread
-{
+class ThreadPool {
private:
- std::thread _thread;
- Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {}
+ std::vector<std::thread> _threads;
public:
- Thread() noexcept : _thread() {}
- Thread(const Thread &rhs) = delete;
- Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {}
- std::thread::id get_id() const noexcept { return _thread.get_id(); }
- Thread &operator=(const Thread &rhs) = delete;
- Thread &operator=(Thread &&rhs) noexcept;
- void join();
- ~Thread();
- [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t);
+ ThreadPool() noexcept : _threads() {}
+ void start(Runnable &runnable, Runnable::init_fun_t init_fun) {
+ _threads.reserve(_threads.size() + 1);
+ _threads.push_back(thread::start(runnable, std::move(init_fun)));
+ }
template<typename F, typename... Args>
requires std::invocable<F,Args...>
- [[nodiscard]] static Thread start(F &&f, Args && ... args) {
- return Thread(std::thread(std::forward<F>(f), std::forward<Args>(args)...));
+ void start(F &&f, Args && ... args) {
+ _threads.reserve(_threads.size() + 1);
+ _threads.emplace_back(std::forward<F>(f), std::forward<Args>(args)...);
};
+ void join() {
+ for (auto &thread: _threads) {
+ thread.join();
+ }
+ _threads.clear();
+ }
};
-} // namespace vespalib
+}