diff options
Diffstat (limited to 'vespalib/src')
-rw-r--r-- | vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp | 1 | ||||
-rw-r--r-- | vespalib/src/tests/singleexecutor/singleexecutor_test.cpp | 1 | ||||
-rw-r--r-- | vespalib/src/tests/thread/thread_test.cpp | 48 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/testkit/test_hook.cpp | 25 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/testkit/test_hook.h | 13 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/CMakeLists.txt | 2 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/active.cpp | 7 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/active.h | 34 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/joinable.cpp | 7 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/joinable.h | 23 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp | 4 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/singleexecutor.cpp | 9 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/singleexecutor.h | 6 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/thread.cpp | 75 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/thread.h | 56 |
15 files changed, 73 insertions, 238 deletions
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp index 59aeddfe8ca..e451f1e033d 100644 --- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp +++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp @@ -4,6 +4,7 @@ #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/box.h> #include <vespa/vespalib/util/small_vector.h> +#include <vespa/vespalib/util/gate.h> #include <thread> #include <forward_list> diff --git a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index 56352ff3c0d..3b1d244eb13 100644 --- a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -5,6 +5,7 @@ #include <vespa/vespalib/util/singleexecutor.h> #include <vespa/vespalib/util/lambdatask.h> #include <vespa/vespalib/util/alloc.h> +#include <vespa/vespalib/util/gate.h> #include <atomic> using namespace vespalib; diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index af1fb626462..f7e8753fd86 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -2,56 +2,48 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/vespalib/util/thread.h> -#include <thread> using namespace vespalib; VESPA_THREAD_STACK_TAG(test_agent_thread); struct Agent : public Runnable { - bool started; - int loopCnt; - Agent() : started(false), loopCnt(0) {} + bool was_run; + Agent() : was_run(false) {} void run() override { - started = true; - Thread &thread = Thread::currentThread(); - while (thread.slumber(60.0)) { - ++loopCnt; - } + was_run = true; } }; -TEST("thread never started") { +void my_fun(bool *was_run) { + *was_run = true; +} + +TEST("run vespalib::Runnable with init function") { Agent agent; { - Thread thread(agent, test_agent_thread); + auto thread = Thread::start(agent, test_agent_thread); } - EXPECT_TRUE(!agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(agent.was_run); } -TEST("normal operation") { - Agent agent; +TEST("run custom function") { + bool was_run = false; { - Thread thread(agent, test_agent_thread); - thread.start(); - std::this_thread::sleep_for(20ms); - thread.stop().join(); + auto thread = Thread::start(my_fun, &was_run); } - EXPECT_TRUE(agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(was_run); } -TEST("stop before start") { - Agent agent; +TEST("join multiple times (including destructor)") { + bool was_run = false; { - Thread thread(agent, test_agent_thread); - thread.stop(); - thread.start(); + auto thread = Thread::start(my_fun, &was_run); + thread.join(); + thread.join(); thread.join(); } - EXPECT_TRUE(!agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(was_run); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp index 4e33897d869..5b618beb01b 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp @@ -4,31 +4,9 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/size_literals.h> #include <regex> -#include <vespa/fastos/thread.h> namespace vespalib { -namespace { - -struct FastOSTestThreadRunner : FastOS_Runnable { - TestThreadEntry &entry; - FastOSTestThreadRunner(TestThreadEntry &entry_in) : entry(entry_in) {} - bool DeleteOnCompletion() const override { return true; } - void Run(FastOS_ThreadInterface *, void *) override { entry.threadEntry(); } -}; - -struct FastOSTestThreadFactory : TestThreadFactory { - FastOS_ThreadPool threadPool; - FastOSTestThreadFactory() : threadPool() {} - void createThread(TestThreadEntry &entry) override { - threadPool.NewThread(new FastOSTestThreadRunner(entry), 0); - } -}; - -} // namespace vespalib::<unnamed> - -__thread TestThreadFactory *TestThreadFactory::factory = 0; - void TestThreadWrapper::threadEntry() { @@ -96,8 +74,6 @@ const char *lookup_subset_pattern(const std::string &name) { void TestHook::runAll() { - FastOSTestThreadFactory threadFactory; - TestThreadFactory::factory = &threadFactory; std::string name = TestMaster::master.getName(); std::regex pattern(lookup_subset_pattern(name)); size_t testsPassed = 0; @@ -134,7 +110,6 @@ TestHook::runAll() fprintf(stderr, "%s: Warn: test summary --- %zu test(s) ignored\n", name.c_str(), testsIgnored); } - TestThreadFactory::factory = 0; } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h index d0e04ede2e9..8a5c6c1e684 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.h +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h @@ -4,6 +4,8 @@ #include <vespa/vespalib/util/count_down_latch.h> #include <vespa/vespalib/util/barrier.h> +#include <vespa/vespalib/util/thread.h> +#include <thread> #include <string> #include <vector> #include <cassert> @@ -16,12 +18,6 @@ struct TestThreadEntry { virtual ~TestThreadEntry() {} }; -struct TestThreadFactory { - static __thread TestThreadFactory *factory; - virtual void createThread(TestThreadEntry &entry) = 0; - virtual ~TestThreadFactory() {} -}; - struct TestFixtureWrapper { size_t thread_id; size_t num_threads; @@ -82,8 +78,10 @@ protected: Barrier barrier(num_threads); std::vector<FixtureUP> fixtures; std::vector<ThreadUP> threads; + std::vector<Thread> thread_handles; threads.reserve(num_threads); fixtures.reserve(num_threads); + thread_handles.reserve(num_threads - 1); for (size_t i = 0; i < num_threads; ++i) { FixtureUP fixture_up(new T(fixture)); fixture_up->thread_id = i; @@ -92,8 +90,7 @@ protected: fixtures.push_back(std::move(fixture_up)); } for (size_t i = 1; i < num_threads; ++i) { - assert(TestThreadFactory::factory != 0); - TestThreadFactory::factory->createThread(*threads[i]); + thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); })); } threads[0]->threadEntry(); latch.await(); diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index ad2db89288c..73e8b93a2ff 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_util OBJECT SOURCES - active.cpp adaptive_sequenced_executor.cpp address_space.cpp alloc.cpp @@ -45,7 +44,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT invokeserviceimpl.cpp isequencedtaskexecutor.cpp issue.cpp - joinable.cpp jsonexception.cpp jsonstream.cpp jsonwriter.cpp diff --git a/vespalib/src/vespa/vespalib/util/active.cpp b/vespalib/src/vespa/vespalib/util/active.cpp deleted file mode 100644 index 48785c74b79..00000000000 --- a/vespalib/src/vespa/vespalib/util/active.cpp +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "active.h" - -namespace vespalib { - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/active.h b/vespalib/src/vespa/vespalib/util/active.h deleted file mode 100644 index 1fbff9514d7..00000000000 --- a/vespalib/src/vespa/vespalib/util/active.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "joinable.h" - -namespace vespalib { - -/** - * Interface used to abstract entities that are the source of - * activity. - **/ -struct Active : Joinable { - /** - * Start activity. - **/ - virtual void start() = 0; - - /** - * Request that activity stops. The returned object can be used to - * wait for the actual conclusion of the activity. - * - * @return object that can be used to wait for activity completion - **/ - virtual Joinable &stop() = 0; - - /** - * Empty virtual destructor to enable subclassing. - **/ - virtual ~Active() {} -}; - -} // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/joinable.cpp b/vespalib/src/vespa/vespalib/util/joinable.cpp deleted file mode 100644 index 58112660389..00000000000 --- a/vespalib/src/vespa/vespalib/util/joinable.cpp +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "joinable.h" - -namespace vespalib { - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/joinable.h b/vespalib/src/vespa/vespalib/util/joinable.h deleted file mode 100644 index 275ae740d26..00000000000 --- a/vespalib/src/vespa/vespalib/util/joinable.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -namespace vespalib { - -/** - * Concurrent activity that we can wait for a conclusion of. - **/ -struct Joinable { - /** - * Wait for the conclusion of this concurrent activity - **/ - virtual void join() = 0; - - /** - * Empty virtual destructor to enable subclassing. - **/ - virtual ~Joinable() {} -}; - -} // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index 8a66a4f6898..becb5d2ab74 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -178,11 +178,11 @@ SimpleThreadBundle::run(Runnable* const* targets, size_t cnt) } SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h) - : thread(*this, std::move(init_fun)), + : thread(), signal(s), hook(std::move(h)) { - thread.start(); + thread = Thread::start(*this, std::move(init_fun)); } void diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp index c2f83bbcf09..298226d8805 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,7 +19,8 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool _mutex(), _consumerCondition(), _producerCondition(), - _thread(*this, func), + _thread(), + _stopped(false), _idleTracker(steady_clock::now()), _threadIdleTracker(), _wakeupCount(0), @@ -37,13 +38,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool if ( ! isQueueSizeHard) { _overflow = std::make_unique<ArrayQueue<Task::UP>>(); } - _thread.start(); + _thread = Thread::start(*this, func); } SingleExecutor::~SingleExecutor() { shutdown(); sync(); - _thread.stop(); + stop(); _consumerCondition.notify_one(); _thread.join(); } @@ -140,7 +141,7 @@ SingleExecutor::shutdown() { void SingleExecutor::run() { - while (!_thread.stopped()) { + while (!stopped()) { drain_tasks(); _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h index dd755a76302..051f506e90a 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -9,6 +9,8 @@ #include <vespa/vespalib/util/executor_idle_tracking.h> #include <thread> #include <atomic> +#include <mutex> +#include <condition_variable> namespace vespalib { @@ -38,6 +40,8 @@ private: using Lock = std::unique_lock<std::mutex>; void drain(Lock & lock); void run() override; + void stop() { _stopped = true; } + bool stopped() const { return _stopped.load(std::memory_order_relaxed); } void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); @@ -48,7 +52,6 @@ private: uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks(); uint64_t numTasks(Lock & guard) const { return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); @@ -68,6 +71,7 @@ private: std::condition_variable _consumerCondition; std::condition_variable _producerCondition; vespalib::Thread _thread; + std::atomic<bool> _stopped; ExecutorIdleTracker _idleTracker; ThreadIdleTracker _threadIdleTracker; uint64_t _wakeupCount; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index b6a491ee83d..cad24c5bcda 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -1,56 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "thread.h" -#include "time.h" -#include <thread> -#include <cassert> namespace vespalib { -__thread Thread *Thread::_currentThread = nullptr; - -void -Thread::run() -{ - assert(_currentThread == nullptr); - _currentThread = this; - _start.await(); - if (!stopped()) { - _init_fun(_runnable); - } - assert(_currentThread == this); - _currentThread = nullptr; -} - -Thread::Thread(Runnable &runnable, init_fun_t init_fun_in) - : _runnable(runnable), - _init_fun(std::move(init_fun_in)), - _start(), - _lock(), - _cond(), - _stopped(false), - _woken(false), - _thread(&Thread::run, this) -{ -} - -Thread::~Thread() -{ - stop().start(); -} - -void -Thread::start() -{ - _start.countDown(); -} - Thread & -Thread::stop() +Thread::operator=(Thread &&rhs) noexcept { - std::unique_lock guard(_lock); - _stopped.store(true, std::memory_order_relaxed); - _cond.notify_all(); + // may call std::terminate + _thread = std::move(rhs._thread); return *this; } @@ -62,32 +20,15 @@ Thread::join() } } -bool -Thread::slumber(double s) -{ - std::unique_lock guard(_lock); - if (!stopped() || _woken) { - if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) { - _woken = stopped(); - } - } else { - _woken = true; - } - return !stopped(); -} - -Thread & -Thread::currentThread() +Thread::~Thread() { - Thread *thread = _currentThread; - assert(thread != nullptr); - return *thread; + join(); } -void -Thread::sleep(size_t ms) +Thread +Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun) { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + return start([&runnable, init_fun](){ init_fun(runnable); }); } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index c03f1c6e65c..6db1c1d54b4 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -2,46 +2,42 @@ #pragma once -#include "gate.h" #include "runnable.h" -#include "active.h" -#include <atomic> #include <thread> +#include <concepts> namespace vespalib { /** - * Abstraction of the concept of running a single thread. + * Thin thread abstraction that takes some things from std::thread + * (not allowed to assign to a running thread), some things from + * std::jthread (destructor does automatic join) and some things from + * now deprecated thread pools (the join function can be called + * multiple times and will only join the underlying thread if it is + * joinable). Enables starting a thread either by using a runnable and + * an init function or by forwarding directly to the std::thread + * constructor. Note that this class does not handle cancellation. **/ -class Thread : public Active +class Thread { private: - using init_fun_t = Runnable::init_fun_t; - static __thread Thread *_currentThread; - - Runnable &_runnable; - init_fun_t _init_fun; - vespalib::Gate _start; - std::mutex _lock; - std::condition_variable _cond; - std::atomic<bool> _stopped; - bool _woken; - std::jthread _thread; - - void run(); - + std::thread _thread; + Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {} public: - Thread(Runnable &runnable, init_fun_t init_fun_in); - ~Thread() override; - void start() override; - Thread &stop() override; - void join() override; - [[nodiscard]] bool stopped() const noexcept { - return _stopped.load(std::memory_order_relaxed); - } - bool slumber(double s); - static Thread ¤tThread(); - static void sleep(size_t ms); + Thread() noexcept : _thread() {} + Thread(const Thread &rhs) = delete; + Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {} + std::thread::id get_id() const noexcept { return _thread.get_id(); } + Thread &operator=(const Thread &rhs) = delete; + Thread &operator=(Thread &&rhs) noexcept; + void join(); + ~Thread(); + [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t); + template<typename F, typename... Args> + requires std::invocable<F,Args...> + [[nodiscard]] static Thread start(F &&f, Args && ... args) { + return Thread(std::thread(std::forward<F>(f), std::forward<Args>(args)...)); + }; }; } // namespace vespalib |