From 070fc34cee07db023824c76995bba43f2262d6c1 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Wed, 15 Feb 2023 13:10:31 +0000 Subject: use std::thread directly also add very simple ThreadPool class to run multiple threads at once make an effort to only join once --- config/src/vespa/config/helper/configfetcher.cpp | 7 ++-- config/src/vespa/config/helper/configfetcher.h | 5 +-- .../vespa/config/retriever/simpleconfigurer.cpp | 10 +++-- .../src/vespa/config/retriever/simpleconfigurer.h | 2 +- slobrok/src/vespa/slobrok/server/slobrokserver.cpp | 4 +- slobrok/src/vespa/slobrok/server/slobrokserver.h | 2 +- .../filestorage/operationabortingtest.cpp | 2 +- vbench/src/apps/vbench/vbench.cpp | 3 +- vbench/src/tests/dispatcher/dispatcher_test.cpp | 4 +- vbench/src/vbench/core/handler_thread.h | 2 +- vbench/src/vbench/core/handler_thread.hpp | 6 ++- vbench/src/vbench/vbench/request_scheduler.cpp | 2 +- vbench/src/vbench/vbench/request_scheduler.h | 2 +- vbench/src/vbench/vbench/vbench.cpp | 2 +- vbench/src/vbench/vbench/vbench.h | 2 +- vbench/src/vbench/vbench/worker.cpp | 2 +- vbench/src/vbench/vbench/worker.h | 2 +- vespalib/src/tests/thread/thread_test.cpp | 39 ++++++++++-------- vespalib/src/vespa/vespalib/testkit/test_hook.h | 6 +-- .../vespa/vespalib/util/simple_thread_bundle.cpp | 2 +- .../src/vespa/vespalib/util/simple_thread_bundle.h | 3 +- .../src/vespa/vespalib/util/singleexecutor.cpp | 2 +- vespalib/src/vespa/vespalib/util/singleexecutor.h | 2 +- vespalib/src/vespa/vespalib/util/thread.cpp | 32 +++------------ vespalib/src/vespa/vespalib/util/thread.h | 48 +++++++++++----------- 25 files changed, 90 insertions(+), 103 deletions(-) diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp index ec4ea42d0bb..db0a64b248c 100644 --- a/config/src/vespa/config/helper/configfetcher.cpp +++ b/config/src/vespa/config/helper/configfetcher.cpp @@ -15,7 +15,7 @@ VESPA_THREAD_STACK_TAG(config_fetcher_thread); ConfigFetcher::ConfigFetcher(std::shared_ptr context) : _poller(std::make_unique(std::move(context))), - _thread(std::make_unique()), + _thread(), _closed(false), _started(false) { @@ -36,7 +36,7 @@ ConfigFetcher::start() throw ConfigTimeoutException("ConfigFetcher::start timed out getting initial config"); } LOG(debug, "Starting fetcher thread..."); - *_thread = vespalib::Thread::start(*_poller, config_fetcher_thread); + _thread = vespalib::thread::start(*_poller, config_fetcher_thread); _started = true; LOG(debug, "Fetcher thread started"); } @@ -58,7 +58,8 @@ ConfigFetcher::close() if (!_closed) { _poller->close(); if (_started) - _thread->join(); + _thread.join(); + _closed = true; } } diff --git a/config/src/vespa/config/helper/configfetcher.h b/config/src/vespa/config/helper/configfetcher.h index 2ed236eda68..67e8f5d134f 100644 --- a/config/src/vespa/config/helper/configfetcher.h +++ b/config/src/vespa/config/helper/configfetcher.h @@ -5,8 +5,7 @@ #include #include #include - -namespace vespalib { class Thread; } +#include namespace config { @@ -31,7 +30,7 @@ public: int64_t getGeneration() const; private: std::unique_ptr _poller; - std::unique_ptr _thread; + std::thread _thread; std::atomic _closed; std::atomic _started; }; diff --git a/config/src/vespa/config/retriever/simpleconfigurer.cpp b/config/src/vespa/config/retriever/simpleconfigurer.cpp index 1e89f51ec03..83e56737f8d 100644 --- a/config/src/vespa/config/retriever/simpleconfigurer.cpp +++ b/config/src/vespa/config/retriever/simpleconfigurer.cpp @@ -25,7 +25,7 @@ SimpleConfigurer::start() if (!_retriever->isClosed()) { LOG(debug, "Polling for config"); runConfigure(); - _thread = vespalib::Thread::start(*this, simple_configurer_thread); + _thread = vespalib::thread::start(*this, simple_configurer_thread); _started = true; } } @@ -38,9 +38,11 @@ SimpleConfigurer::~SimpleConfigurer() void SimpleConfigurer::close() { - _retriever->close(); - if (_started) - _thread.join(); + if (!_retriever->isClosed()) { + _retriever->close(); + if (_started) + _thread.join(); + } } void diff --git a/config/src/vespa/config/retriever/simpleconfigurer.h b/config/src/vespa/config/retriever/simpleconfigurer.h index aa6508c75a2..95fa12610cb 100644 --- a/config/src/vespa/config/retriever/simpleconfigurer.h +++ b/config/src/vespa/config/retriever/simpleconfigurer.h @@ -46,7 +46,7 @@ private: SimpleConfigRetriever::UP _retriever; SimpleConfigurable * const _configurable; - vespalib::Thread _thread; + std::thread _thread; std::atomic _started; }; diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp index 4a986d9ba01..4e58cbeaaeb 100644 --- a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp @@ -10,14 +10,14 @@ SlobrokServer::SlobrokServer(ConfigShim &shim) : _env(shim), _thread() { - _thread = vespalib::Thread::start(*this, slobrok_server_thread); + _thread = vespalib::thread::start(*this, slobrok_server_thread); } SlobrokServer::SlobrokServer(uint32_t port) : _env(ConfigShim(port)), _thread() { - _thread = vespalib::Thread::start(*this, slobrok_server_thread); + _thread = vespalib::thread::start(*this, slobrok_server_thread); } diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.h b/slobrok/src/vespa/slobrok/server/slobrokserver.h index 1aeba9b6be3..60a78c43d21 100644 --- a/slobrok/src/vespa/slobrok/server/slobrokserver.h +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.h @@ -12,7 +12,7 @@ class SlobrokServer : public vespalib::Runnable { private: SBEnv _env; - vespalib::Thread _thread; + std::thread _thread; public: SlobrokServer(ConfigShim &shim); diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index d7ecb1f30f1..85816c3ad3e 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -296,7 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_ auto abortCmd = makeAbortCmd(abortSet); SendTask sendTask(abortCmd, *_queueBarrier, c.top); - auto thread = vespalib::Thread::start(sendTask, test_thread); + auto thread = vespalib::thread::start(sendTask, test_thread); LOG(debug, "waiting for threads to reach barriers"); _queueBarrier->await(); diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp index b5c2897207f..c73f38adaf2 100644 --- a/vbench/src/apps/vbench/vbench.cpp +++ b/vbench/src/apps/vbench/vbench.cpp @@ -44,12 +44,13 @@ int run(const std::string &cfg_name) { VBench vbench(cfg); NotifyDone notify(done); vespalib::RunnablePair runBoth(vbench, notify); - auto thread = vespalib::Thread::start(runBoth, vbench_thread); + auto thread = vespalib::thread::start(runBoth, vbench_thread); while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {} if (!done.await(vespalib::duration::zero())) { vbench.abort(); done.await(); } + thread.join(); if (vbench.tainted()) { fprintf(stderr, "vbench failed: %s\n", vbench.tainted().reason().c_str()); diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp index 6a3fb8d0c7c..49a41508c7b 100644 --- a/vbench/src/tests/dispatcher/dispatcher_test.cpp +++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp @@ -30,9 +30,9 @@ TEST("dispatcher") { Dispatcher dispatcher(dropped); Fetcher fetcher1(dispatcher, handler1); Fetcher fetcher2(dispatcher, handler2); - auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread); + auto thread1 = vespalib::thread::start(fetcher1, fetcher1_thread); EXPECT_TRUE(dispatcher.waitForThreads(1, 512)); - auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread); + auto thread2 = vespalib::thread::start(fetcher2, fetcher2_thread); EXPECT_TRUE(dispatcher.waitForThreads(2, 512)); EXPECT_EQUAL(-1, dropped.value); EXPECT_EQUAL(-1, handler1.value); diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h index 8ece1389dfc..81a0a720720 100644 --- a/vbench/src/vbench/core/handler_thread.h +++ b/vbench/src/vbench/core/handler_thread.h @@ -26,7 +26,7 @@ private: std::condition_variable _cond; vespalib::ArrayQueue > _queue; Handler &_next; - vespalib::Thread _thread; + std::thread _thread; bool _done; void run() override; diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp index 1a99861ea81..3373e196ab7 100644 --- a/vbench/src/vbench/core/handler_thread.hpp +++ b/vbench/src/vbench/core/handler_thread.hpp @@ -31,13 +31,15 @@ HandlerThread::HandlerThread(Handler &next, init_fun_t init_fun) _thread(), _done(false) { - _thread = vespalib::Thread::start(*this, init_fun); + _thread = vespalib::thread::start(*this, init_fun); } template HandlerThread::~HandlerThread() { - join(); + if (!_done) { + join(); + } assert(_queue.empty()); } diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp index cde31ec07b8..136f2bed98b 100644 --- a/vbench/src/vbench/vbench/request_scheduler.cpp +++ b/vbench/src/vbench/vbench/request_scheduler.cpp @@ -71,7 +71,7 @@ void RequestScheduler::start() { _timer.reset(); - _thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread); + _thread = vespalib::thread::start(*this, vbench_request_scheduler_thread); } RequestScheduler & diff --git a/vbench/src/vbench/vbench/request_scheduler.h b/vbench/src/vbench/vbench/request_scheduler.h index b1d525eb691..1483a86a458 100644 --- a/vbench/src/vbench/vbench/request_scheduler.h +++ b/vbench/src/vbench/vbench/request_scheduler.h @@ -26,7 +26,7 @@ private: TimeQueue _queue; DroppedTagger _droppedTagger; Dispatcher _dispatcher; - vespalib::Thread _thread; + std::thread _thread; HttpConnectionPool _connectionPool; std::vector _workers; std::mutex _lock; diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp index 8a8bceccefb..58d0f9e0345 100644 --- a/vbench/src/vbench/vbench/vbench.cpp +++ b/vbench/src/vbench/vbench/vbench.cpp @@ -100,7 +100,7 @@ VBench::run() { _scheduler->start(); for (size_t i = 0; i < _inputs.size(); ++i) { - _inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator); + _inputs[i]->thread = vespalib::thread::start(*_inputs[i]->generator, vbench_inputchain_generator); } for (size_t i = 0; i < _inputs.size(); ++i) { _inputs[i]->thread.join(); diff --git a/vbench/src/vbench/vbench/vbench.h b/vbench/src/vbench/vbench/vbench.h index 2b7fcf0cd88..f355beddce5 100644 --- a/vbench/src/vbench/vbench/vbench.h +++ b/vbench/src/vbench/vbench/vbench.h @@ -26,7 +26,7 @@ private: using UP = std::unique_ptr; std::vector taggers; Generator::UP generator; - vespalib::Thread thread; + std::thread thread; }; NativeFactory _factory; std::vector _analyzers; diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp index eabd17ae73f..d9ba039eb53 100644 --- a/vbench/src/vbench/vbench/worker.cpp +++ b/vbench/src/vbench/vbench/worker.cpp @@ -30,7 +30,7 @@ Worker::Worker(Provider &provider, Handler &next, _pool(pool), _timer(timer) { - _thread = vespalib::Thread::start(*this, vbench_worker_thread); + _thread = vespalib::thread::start(*this, vbench_worker_thread); } } // namespace vbench diff --git a/vbench/src/vbench/vbench/worker.h b/vbench/src/vbench/vbench/worker.h index d2bcfae637b..9cd7b04fdfe 100644 --- a/vbench/src/vbench/vbench/worker.h +++ b/vbench/src/vbench/vbench/worker.h @@ -20,7 +20,7 @@ namespace vbench { class Worker : public vespalib::Runnable { private: - vespalib::Thread _thread; + std::thread _thread; Provider &_provider; Handler &_next; HttpConnectionPool &_pool; diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index f7e8753fd86..9533fe1c190 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -19,30 +19,33 @@ void my_fun(bool *was_run) { *was_run = true; } +Runnable::init_fun_t wrap(Runnable::init_fun_t init, bool *init_called) { + return [=](Runnable &target) + { + *init_called = true; + return init(target); + }; +} + TEST("run vespalib::Runnable with init function") { Agent agent; - { - auto thread = Thread::start(agent, test_agent_thread); - } + bool init_called = false; + auto thread = thread::start(agent, wrap(test_agent_thread, &init_called)); + thread.join(); + EXPECT_TRUE(init_called); EXPECT_TRUE(agent.was_run); } -TEST("run custom function") { - bool was_run = false; - { - auto thread = Thread::start(my_fun, &was_run); - } - EXPECT_TRUE(was_run); -} - -TEST("join multiple times (including destructor)") { +TEST("use thread pool to run multiple things") { + Agent agent; + bool init_called = false; bool was_run = false; - { - auto thread = Thread::start(my_fun, &was_run); - thread.join(); - thread.join(); - thread.join(); - } + ThreadPool pool; + pool.start(my_fun, &was_run); + pool.start(agent, wrap(test_agent_thread, &init_called)); + pool.join(); + EXPECT_TRUE(init_called); + EXPECT_TRUE(agent.was_run); EXPECT_TRUE(was_run); } diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h index 8a5c6c1e684..0ff5ba11709 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.h +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h @@ -78,10 +78,9 @@ protected: Barrier barrier(num_threads); std::vector fixtures; std::vector threads; - std::vector thread_handles; + ThreadPool pool; threads.reserve(num_threads); fixtures.reserve(num_threads); - thread_handles.reserve(num_threads - 1); for (size_t i = 0; i < num_threads; ++i) { FixtureUP fixture_up(new T(fixture)); fixture_up->thread_id = i; @@ -90,10 +89,11 @@ protected: fixtures.push_back(std::move(fixture_up)); } for (size_t i = 1; i < num_threads; ++i) { - thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); })); + pool.start([&target = *threads[i]](){ target.threadEntry(); }); } threads[0]->threadEntry(); latch.await(); + pool.join(); bool result = true; for (size_t i = 0; i < num_threads; ++i) { result = result && threads[i]->getResult(); diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index becb5d2ab74..958bb58f34a 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -182,7 +182,7 @@ SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Run signal(s), hook(std::move(h)) { - thread = Thread::start(*this, std::move(init_fun)); + thread = thread::start(*this, std::move(init_fun)); } void diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h index b5f371d2de3..536474d9300 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.h @@ -113,7 +113,7 @@ public: private: struct Worker : Runnable { using UP = std::unique_ptr; - Thread thread; + std::thread thread; Signal &signal; Runnable::UP hook; Worker(Signal &s, init_fun_t init_fun, Runnable::UP h); @@ -136,4 +136,3 @@ public: }; } // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 298226d8805..fa1aef61167 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -38,7 +38,7 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool if ( ! isQueueSizeHard) { _overflow = std::make_unique>(); } - _thread = Thread::start(*this, func); + _thread = thread::start(*this, func); } SingleExecutor::~SingleExecutor() { diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h index 051f506e90a..e8b7b0c44ad 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -70,7 +70,7 @@ private: std::mutex _mutex; std::condition_variable _consumerCondition; std::condition_variable _producerCondition; - vespalib::Thread _thread; + std::thread _thread; std::atomic _stopped; ExecutorIdleTracker _idleTracker; ThreadIdleTracker _threadIdleTracker; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index cad24c5bcda..1c8fef53ba3 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -2,33 +2,13 @@ #include "thread.h" -namespace vespalib { +namespace vespalib::thread { -Thread & -Thread::operator=(Thread &&rhs) noexcept -{ - // may call std::terminate - _thread = std::move(rhs._thread); - return *this; +std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun_in) { + return std::thread([&runnable, init_fun = std::move(init_fun_in)]() + { + init_fun(runnable); + }); } -void -Thread::join() -{ - if (_thread.joinable()) { - _thread.join(); - } } - -Thread::~Thread() -{ - join(); -} - -Thread -Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun) -{ - return start([&runnable, init_fun](){ init_fun(runnable); }); -} - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index 6db1c1d54b4..d919b8f2ab7 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -8,36 +8,36 @@ namespace vespalib { +namespace thread { +[[nodiscard]] std::thread start(Runnable &runnable, Runnable::init_fun_t init_fun); +} + /** - * Thin thread abstraction that takes some things from std::thread - * (not allowed to assign to a running thread), some things from - * std::jthread (destructor does automatic join) and some things from - * now deprecated thread pools (the join function can be called - * multiple times and will only join the underlying thread if it is - * joinable). Enables starting a thread either by using a runnable and - * an init function or by forwarding directly to the std::thread - * constructor. Note that this class does not handle cancellation. + * Keeps track of multiple running threads. Calling join will join all + * currently running threads. All threads must be joined before + * destructing the pool itself. This class is not thread safe. **/ -class Thread -{ +class ThreadPool { private: - std::thread _thread; - Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {} + std::vector _threads; public: - Thread() noexcept : _thread() {} - Thread(const Thread &rhs) = delete; - Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {} - std::thread::id get_id() const noexcept { return _thread.get_id(); } - Thread &operator=(const Thread &rhs) = delete; - Thread &operator=(Thread &&rhs) noexcept; - void join(); - ~Thread(); - [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t); + ThreadPool() noexcept : _threads() {} + void start(Runnable &runnable, Runnable::init_fun_t init_fun) { + _threads.reserve(_threads.size() + 1); + _threads.push_back(thread::start(runnable, std::move(init_fun))); + } template requires std::invocable - [[nodiscard]] static Thread start(F &&f, Args && ... args) { - return Thread(std::thread(std::forward(f), std::forward(args)...)); + void start(F &&f, Args && ... args) { + _threads.reserve(_threads.size() + 1); + _threads.emplace_back(std::forward(f), std::forward(args)...); }; + void join() { + for (auto &thread: _threads) { + thread.join(); + } + _threads.clear(); + } }; -} // namespace vespalib +} -- cgit v1.2.3