aboutsummaryrefslogtreecommitdiffstats
path: root/vespalib/src
diff options
context:
space:
mode:
Diffstat (limited to 'vespalib/src')
-rw-r--r--vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp1
-rw-r--r--vespalib/src/tests/singleexecutor/singleexecutor_test.cpp1
-rw-r--r--vespalib/src/tests/thread/thread_test.cpp48
-rw-r--r--vespalib/src/vespa/vespalib/testkit/test_hook.cpp25
-rw-r--r--vespalib/src/vespa/vespalib/testkit/test_hook.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/CMakeLists.txt2
-rw-r--r--vespalib/src/vespa/vespalib/util/active.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/active.h34
-rw-r--r--vespalib/src/vespa/vespalib/util/joinable.cpp7
-rw-r--r--vespalib/src/vespa/vespalib/util/joinable.h23
-rw-r--r--vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp4
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.cpp9
-rw-r--r--vespalib/src/vespa/vespalib/util/singleexecutor.h6
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.cpp75
-rw-r--r--vespalib/src/vespa/vespalib/util/thread.h56
15 files changed, 73 insertions, 238 deletions
diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
index 59aeddfe8ca..e451f1e033d 100644
--- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
+++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/box.h>
#include <vespa/vespalib/util/small_vector.h>
+#include <vespa/vespalib/util/gate.h>
#include <thread>
#include <forward_list>
diff --git a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index 56352ff3c0d..3b1d244eb13 100644
--- a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -5,6 +5,7 @@
#include <vespa/vespalib/util/singleexecutor.h>
#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/vespalib/util/alloc.h>
+#include <vespa/vespalib/util/gate.h>
#include <atomic>
using namespace vespalib;
diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp
index af1fb626462..f7e8753fd86 100644
--- a/vespalib/src/tests/thread/thread_test.cpp
+++ b/vespalib/src/tests/thread/thread_test.cpp
@@ -2,56 +2,48 @@
#include <vespa/vespalib/testkit/test_kit.h>
#include <vespa/vespalib/util/thread.h>
-#include <thread>
using namespace vespalib;
VESPA_THREAD_STACK_TAG(test_agent_thread);
struct Agent : public Runnable {
- bool started;
- int loopCnt;
- Agent() : started(false), loopCnt(0) {}
+ bool was_run;
+ Agent() : was_run(false) {}
void run() override {
- started = true;
- Thread &thread = Thread::currentThread();
- while (thread.slumber(60.0)) {
- ++loopCnt;
- }
+ was_run = true;
}
};
-TEST("thread never started") {
+void my_fun(bool *was_run) {
+ *was_run = true;
+}
+
+TEST("run vespalib::Runnable with init function") {
Agent agent;
{
- Thread thread(agent, test_agent_thread);
+ auto thread = Thread::start(agent, test_agent_thread);
}
- EXPECT_TRUE(!agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(agent.was_run);
}
-TEST("normal operation") {
- Agent agent;
+TEST("run custom function") {
+ bool was_run = false;
{
- Thread thread(agent, test_agent_thread);
- thread.start();
- std::this_thread::sleep_for(20ms);
- thread.stop().join();
+ auto thread = Thread::start(my_fun, &was_run);
}
- EXPECT_TRUE(agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(was_run);
}
-TEST("stop before start") {
- Agent agent;
+TEST("join multiple times (including destructor)") {
+ bool was_run = false;
{
- Thread thread(agent, test_agent_thread);
- thread.stop();
- thread.start();
+ auto thread = Thread::start(my_fun, &was_run);
+ thread.join();
+ thread.join();
thread.join();
}
- EXPECT_TRUE(!agent.started);
- EXPECT_EQUAL(0, agent.loopCnt);
+ EXPECT_TRUE(was_run);
}
TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
index 4e33897d869..5b618beb01b 100644
--- a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
+++ b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp
@@ -4,31 +4,9 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/size_literals.h>
#include <regex>
-#include <vespa/fastos/thread.h>
namespace vespalib {
-namespace {
-
-struct FastOSTestThreadRunner : FastOS_Runnable {
- TestThreadEntry &entry;
- FastOSTestThreadRunner(TestThreadEntry &entry_in) : entry(entry_in) {}
- bool DeleteOnCompletion() const override { return true; }
- void Run(FastOS_ThreadInterface *, void *) override { entry.threadEntry(); }
-};
-
-struct FastOSTestThreadFactory : TestThreadFactory {
- FastOS_ThreadPool threadPool;
- FastOSTestThreadFactory() : threadPool() {}
- void createThread(TestThreadEntry &entry) override {
- threadPool.NewThread(new FastOSTestThreadRunner(entry), 0);
- }
-};
-
-} // namespace vespalib::<unnamed>
-
-__thread TestThreadFactory *TestThreadFactory::factory = 0;
-
void
TestThreadWrapper::threadEntry()
{
@@ -96,8 +74,6 @@ const char *lookup_subset_pattern(const std::string &name) {
void
TestHook::runAll()
{
- FastOSTestThreadFactory threadFactory;
- TestThreadFactory::factory = &threadFactory;
std::string name = TestMaster::master.getName();
std::regex pattern(lookup_subset_pattern(name));
size_t testsPassed = 0;
@@ -134,7 +110,6 @@ TestHook::runAll()
fprintf(stderr, "%s: Warn: test summary --- %zu test(s) ignored\n",
name.c_str(), testsIgnored);
}
- TestThreadFactory::factory = 0;
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h
index d0e04ede2e9..8a5c6c1e684 100644
--- a/vespalib/src/vespa/vespalib/testkit/test_hook.h
+++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h
@@ -4,6 +4,8 @@
#include <vespa/vespalib/util/count_down_latch.h>
#include <vespa/vespalib/util/barrier.h>
+#include <vespa/vespalib/util/thread.h>
+#include <thread>
#include <string>
#include <vector>
#include <cassert>
@@ -16,12 +18,6 @@ struct TestThreadEntry {
virtual ~TestThreadEntry() {}
};
-struct TestThreadFactory {
- static __thread TestThreadFactory *factory;
- virtual void createThread(TestThreadEntry &entry) = 0;
- virtual ~TestThreadFactory() {}
-};
-
struct TestFixtureWrapper {
size_t thread_id;
size_t num_threads;
@@ -82,8 +78,10 @@ protected:
Barrier barrier(num_threads);
std::vector<FixtureUP> fixtures;
std::vector<ThreadUP> threads;
+ std::vector<Thread> thread_handles;
threads.reserve(num_threads);
fixtures.reserve(num_threads);
+ thread_handles.reserve(num_threads - 1);
for (size_t i = 0; i < num_threads; ++i) {
FixtureUP fixture_up(new T(fixture));
fixture_up->thread_id = i;
@@ -92,8 +90,7 @@ protected:
fixtures.push_back(std::move(fixture_up));
}
for (size_t i = 1; i < num_threads; ++i) {
- assert(TestThreadFactory::factory != 0);
- TestThreadFactory::factory->createThread(*threads[i]);
+ thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); }));
}
threads[0]->threadEntry();
latch.await();
diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
index ad2db89288c..73e8b93a2ff 100644
--- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt
@@ -1,7 +1,6 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_util OBJECT
SOURCES
- active.cpp
adaptive_sequenced_executor.cpp
address_space.cpp
alloc.cpp
@@ -45,7 +44,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT
invokeserviceimpl.cpp
isequencedtaskexecutor.cpp
issue.cpp
- joinable.cpp
jsonexception.cpp
jsonstream.cpp
jsonwriter.cpp
diff --git a/vespalib/src/vespa/vespalib/util/active.cpp b/vespalib/src/vespa/vespalib/util/active.cpp
deleted file mode 100644
index 48785c74b79..00000000000
--- a/vespalib/src/vespa/vespalib/util/active.cpp
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "active.h"
-
-namespace vespalib {
-
-} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/active.h b/vespalib/src/vespa/vespalib/util/active.h
deleted file mode 100644
index 1fbff9514d7..00000000000
--- a/vespalib/src/vespa/vespalib/util/active.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include "joinable.h"
-
-namespace vespalib {
-
-/**
- * Interface used to abstract entities that are the source of
- * activity.
- **/
-struct Active : Joinable {
- /**
- * Start activity.
- **/
- virtual void start() = 0;
-
- /**
- * Request that activity stops. The returned object can be used to
- * wait for the actual conclusion of the activity.
- *
- * @return object that can be used to wait for activity completion
- **/
- virtual Joinable &stop() = 0;
-
- /**
- * Empty virtual destructor to enable subclassing.
- **/
- virtual ~Active() {}
-};
-
-} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/joinable.cpp b/vespalib/src/vespa/vespalib/util/joinable.cpp
deleted file mode 100644
index 58112660389..00000000000
--- a/vespalib/src/vespa/vespalib/util/joinable.cpp
+++ /dev/null
@@ -1,7 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "joinable.h"
-
-namespace vespalib {
-
-} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/joinable.h b/vespalib/src/vespa/vespalib/util/joinable.h
deleted file mode 100644
index 275ae740d26..00000000000
--- a/vespalib/src/vespa/vespalib/util/joinable.h
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-namespace vespalib {
-
-/**
- * Concurrent activity that we can wait for a conclusion of.
- **/
-struct Joinable {
- /**
- * Wait for the conclusion of this concurrent activity
- **/
- virtual void join() = 0;
-
- /**
- * Empty virtual destructor to enable subclassing.
- **/
- virtual ~Joinable() {}
-};
-
-} // namespace vespalib
-
diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
index 8a66a4f6898..becb5d2ab74 100644
--- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
+++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp
@@ -178,11 +178,11 @@ SimpleThreadBundle::run(Runnable* const* targets, size_t cnt)
}
SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h)
- : thread(*this, std::move(init_fun)),
+ : thread(),
signal(s),
hook(std::move(h))
{
- thread.start();
+ thread = Thread::start(*this, std::move(init_fun));
}
void
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index c2f83bbcf09..298226d8805 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -19,7 +19,8 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool
_mutex(),
_consumerCondition(),
_producerCondition(),
- _thread(*this, func),
+ _thread(),
+ _stopped(false),
_idleTracker(steady_clock::now()),
_threadIdleTracker(),
_wakeupCount(0),
@@ -37,13 +38,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool
if ( ! isQueueSizeHard) {
_overflow = std::make_unique<ArrayQueue<Task::UP>>();
}
- _thread.start();
+ _thread = Thread::start(*this, func);
}
SingleExecutor::~SingleExecutor() {
shutdown();
sync();
- _thread.stop();
+ stop();
_consumerCondition.notify_one();
_thread.join();
}
@@ -140,7 +141,7 @@ SingleExecutor::shutdown() {
void
SingleExecutor::run() {
- while (!_thread.stopped()) {
+ while (!stopped()) {
drain_tasks();
_producerCondition.notify_all();
_wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed);
diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h
index dd755a76302..051f506e90a 100644
--- a/vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -9,6 +9,8 @@
#include <vespa/vespalib/util/executor_idle_tracking.h>
#include <thread>
#include <atomic>
+#include <mutex>
+#include <condition_variable>
namespace vespalib {
@@ -38,6 +40,8 @@ private:
using Lock = std::unique_lock<std::mutex>;
void drain(Lock & lock);
void run() override;
+ void stop() { _stopped = true; }
+ bool stopped() const { return _stopped.load(std::memory_order_relaxed); }
void drain_tasks();
void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt);
void run_tasks_till(uint64_t available);
@@ -48,7 +52,6 @@ private:
uint64_t index(uint64_t counter) const {
return counter & (_taskLimit.load(std::memory_order_relaxed) - 1);
}
-
uint64_t numTasks();
uint64_t numTasks(Lock & guard) const {
return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard);
@@ -68,6 +71,7 @@ private:
std::condition_variable _consumerCondition;
std::condition_variable _producerCondition;
vespalib::Thread _thread;
+ std::atomic<bool> _stopped;
ExecutorIdleTracker _idleTracker;
ThreadIdleTracker _threadIdleTracker;
uint64_t _wakeupCount;
diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp
index b6a491ee83d..cad24c5bcda 100644
--- a/vespalib/src/vespa/vespalib/util/thread.cpp
+++ b/vespalib/src/vespa/vespalib/util/thread.cpp
@@ -1,56 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "thread.h"
-#include "time.h"
-#include <thread>
-#include <cassert>
namespace vespalib {
-__thread Thread *Thread::_currentThread = nullptr;
-
-void
-Thread::run()
-{
- assert(_currentThread == nullptr);
- _currentThread = this;
- _start.await();
- if (!stopped()) {
- _init_fun(_runnable);
- }
- assert(_currentThread == this);
- _currentThread = nullptr;
-}
-
-Thread::Thread(Runnable &runnable, init_fun_t init_fun_in)
- : _runnable(runnable),
- _init_fun(std::move(init_fun_in)),
- _start(),
- _lock(),
- _cond(),
- _stopped(false),
- _woken(false),
- _thread(&Thread::run, this)
-{
-}
-
-Thread::~Thread()
-{
- stop().start();
-}
-
-void
-Thread::start()
-{
- _start.countDown();
-}
-
Thread &
-Thread::stop()
+Thread::operator=(Thread &&rhs) noexcept
{
- std::unique_lock guard(_lock);
- _stopped.store(true, std::memory_order_relaxed);
- _cond.notify_all();
+ // may call std::terminate
+ _thread = std::move(rhs._thread);
return *this;
}
@@ -62,32 +20,15 @@ Thread::join()
}
}
-bool
-Thread::slumber(double s)
-{
- std::unique_lock guard(_lock);
- if (!stopped() || _woken) {
- if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) {
- _woken = stopped();
- }
- } else {
- _woken = true;
- }
- return !stopped();
-}
-
-Thread &
-Thread::currentThread()
+Thread::~Thread()
{
- Thread *thread = _currentThread;
- assert(thread != nullptr);
- return *thread;
+ join();
}
-void
-Thread::sleep(size_t ms)
+Thread
+Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun)
{
- std::this_thread::sleep_for(std::chrono::milliseconds(ms));
+ return start([&runnable, init_fun](){ init_fun(runnable); });
}
} // namespace vespalib
diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h
index c03f1c6e65c..6db1c1d54b4 100644
--- a/vespalib/src/vespa/vespalib/util/thread.h
+++ b/vespalib/src/vespa/vespalib/util/thread.h
@@ -2,46 +2,42 @@
#pragma once
-#include "gate.h"
#include "runnable.h"
-#include "active.h"
-#include <atomic>
#include <thread>
+#include <concepts>
namespace vespalib {
/**
- * Abstraction of the concept of running a single thread.
+ * Thin thread abstraction that takes some things from std::thread
+ * (not allowed to assign to a running thread), some things from
+ * std::jthread (destructor does automatic join) and some things from
+ * now deprecated thread pools (the join function can be called
+ * multiple times and will only join the underlying thread if it is
+ * joinable). Enables starting a thread either by using a runnable and
+ * an init function or by forwarding directly to the std::thread
+ * constructor. Note that this class does not handle cancellation.
**/
-class Thread : public Active
+class Thread
{
private:
- using init_fun_t = Runnable::init_fun_t;
- static __thread Thread *_currentThread;
-
- Runnable &_runnable;
- init_fun_t _init_fun;
- vespalib::Gate _start;
- std::mutex _lock;
- std::condition_variable _cond;
- std::atomic<bool> _stopped;
- bool _woken;
- std::jthread _thread;
-
- void run();
-
+ std::thread _thread;
+ Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {}
public:
- Thread(Runnable &runnable, init_fun_t init_fun_in);
- ~Thread() override;
- void start() override;
- Thread &stop() override;
- void join() override;
- [[nodiscard]] bool stopped() const noexcept {
- return _stopped.load(std::memory_order_relaxed);
- }
- bool slumber(double s);
- static Thread &currentThread();
- static void sleep(size_t ms);
+ Thread() noexcept : _thread() {}
+ Thread(const Thread &rhs) = delete;
+ Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {}
+ std::thread::id get_id() const noexcept { return _thread.get_id(); }
+ Thread &operator=(const Thread &rhs) = delete;
+ Thread &operator=(Thread &&rhs) noexcept;
+ void join();
+ ~Thread();
+ [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t);
+ template<typename F, typename... Args>
+ requires std::invocable<F,Args...>
+ [[nodiscard]] static Thread start(F &&f, Args && ... args) {
+ return Thread(std::thread(std::forward<F>(f), std::forward<Args>(args)...));
+ };
};
} // namespace vespalib