diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-18 14:00:30 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-08-18 14:00:30 +0200 |
commit | 28c129658fa7084c8fcf4981fc57f71a41dc5362 (patch) | |
tree | f4a6d1bca3c5f7d885400adf0ba538abfe664873 /vespalib | |
parent | 64809ac8367bb4c059f55a8595f47a323237b238 (diff) |
Hide the thread thread implementation a bit more.
Diffstat (limited to 'vespalib')
-rw-r--r-- | vespalib/src/tests/executor/stress_test.cpp | 5 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp | 28 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h | 36 |
3 files changed, 39 insertions, 30 deletions
diff --git a/vespalib/src/tests/executor/stress_test.cpp b/vespalib/src/tests/executor/stress_test.cpp index aa5d9d53955..5c7620afb14 100644 --- a/vespalib/src/tests/executor/stress_test.cpp +++ b/vespalib/src/tests/executor/stress_test.cpp @@ -2,12 +2,11 @@ #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/locale/c.h> -#include <cmath> using namespace vespalib; +using namespace std::literals; uint32_t doStuff(uint32_t input) { char buf[128]; @@ -131,7 +130,7 @@ Test::Main() Executor::Task::UP t(new CPUTask(taskSize, result)); t = executor.execute(std::move(t)); while (t.get() != 0) { - FastOS_Thread::Sleep(10); + std::this_thread::sleep_for(10ms); t = executor.execute(std::move(t)); } } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index e9f101f242e..76557762479 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -1,15 +1,29 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "threadstackexecutorbase.h" +#include <vespa/fastos/thread.h> namespace vespalib { +namespace thread { + +struct ThreadInit : public FastOS_Runnable { + Runnable &worker; + ThreadStackExecutorBase::init_fun_t init_fun; + + explicit ThreadInit(Runnable &worker_in, ThreadStackExecutorBase::init_fun_t init_fun_in) + : worker(worker_in), init_fun(std::move(init_fun_in)) {} + + void Run(FastOS_ThreadInterface *, void *) override; +}; + void -ThreadStackExecutorBase::ThreadInit::Run(FastOS_ThreadInterface *, void *) -{ +ThreadInit::Run(FastOS_ThreadInterface *, void *) { init_fun(worker); } +} + void ThreadStackExecutorBase::BlockedThread::wait() const { @@ -115,7 +129,7 @@ ThreadStackExecutorBase::run() ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, uint32_t taskLimit, init_fun_t init_fun) - : _pool(stackSize), + : _pool(std::make_unique<FastOS_ThreadPool>(stackSize)), _monitor(), _stats(), _executorCompletion(), @@ -125,7 +139,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t stackSize, _taskCount(0), _taskLimit(taskLimit), _closed(false), - _thread_init(std::make_unique<ThreadInit>(*this, std::move(init_fun))) + _thread_init(std::make_unique<thread::ThreadInit>(*this, std::move(init_fun))) { assert(taskLimit > 0); } @@ -135,7 +149,7 @@ ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); for (uint32_t i = 0; i < threads; ++i) { - FastOS_ThreadInterface *thread = _pool.NewThread(_thread_init.get()); + FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); assert(thread != 0); (void)thread; } @@ -236,12 +250,12 @@ ThreadStackExecutorBase::cleanup() { shutdown().sync(); _executorCompletion.countDown(); - _pool.Close(); + _pool->Close(); } ThreadStackExecutorBase::~ThreadStackExecutorBase() { - assert(_pool.isClosed()); + assert(_pool->isClosed()); assert(_taskCount == 0); assert(_blocked.empty()); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index a91114b055e..4ea27a2bcde 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -11,10 +11,13 @@ #include <memory> #include <vector> #include <functional> -#include <vespa/fastos/thread.h> + +class FastOS_ThreadPool; namespace vespalib { +namespace thread { class ThreadInit; } + // Convenience macro used to create a function that can be used as an // init function when creating an executor to inject a frame with the // given name into the stack of all worker threads. @@ -46,13 +49,6 @@ public: using init_fun_t = std::function<int(Runnable&)>; private: - struct ThreadInit : public FastOS_Runnable { - Runnable &worker; - init_fun_t init_fun; - explicit ThreadInit(Runnable &worker_in, init_fun_t init_fun_in) - : worker(worker_in), init_fun(std::move(init_fun_in)) {} - void Run(FastOS_ThreadInterface *, void *) override; - }; struct TaggedTask { Task::UP task; @@ -102,18 +98,18 @@ private: void unblock(); }; - FastOS_ThreadPool _pool; - Monitor _monitor; - Stats _stats; - Gate _executorCompletion; - ArrayQueue<TaggedTask> _tasks; - ArrayQueue<Worker*> _workers; - std::vector<BlockedThread*> _blocked; - EventBarrier<BarrierCompletion> _barrier; - uint32_t _taskCount; - uint32_t _taskLimit; - bool _closed; - std::unique_ptr<ThreadInit> _thread_init; + std::unique_ptr<FastOS_ThreadPool> _pool; + Monitor _monitor; + Stats _stats; + Gate _executorCompletion; + ArrayQueue<TaggedTask> _tasks; + ArrayQueue<Worker*> _workers; + std::vector<BlockedThread*> _blocked; + EventBarrier<BarrierCompletion> _barrier; + uint32_t _taskCount; + uint32_t _taskLimit; + bool _closed; + std::unique_ptr<thread::ThreadInit> _thread_init; void block_thread(const LockGuard &, BlockedThread &blocked_thread); void unblock_threads(const MonitorGuard &); |