From 63702f1cfcee53e2433c10216912b4ef739a6086 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Tue, 21 Feb 2023 13:06:58 +0000 Subject: stop using FastOS_Thread in vespalib --- vespalib/src/tests/clock/clock_benchmark.cpp | 104 ++++++++++----------- vespalib/src/vespa/vespalib/util/thread.h | 1 + .../vespalib/util/threadstackexecutorbase.cpp | 33 +------ .../vespa/vespalib/util/threadstackexecutorbase.h | 11 +-- 4 files changed, 57 insertions(+), 92 deletions(-) diff --git a/vespalib/src/tests/clock/clock_benchmark.cpp b/vespalib/src/tests/clock/clock_benchmark.cpp index a21ad2b05ef..620b4e5c83c 100644 --- a/vespalib/src/tests/clock/clock_benchmark.cpp +++ b/vespalib/src/tests/clock/clock_benchmark.cpp @@ -2,7 +2,6 @@ #include #include -#include #include #include #include @@ -10,6 +9,7 @@ #include #include #include +#include using vespalib::Clock; using vespalib::steady_time; @@ -36,7 +36,7 @@ struct NSAtomic : public UpdateClock { std::atomic _value; }; -class TestClock : public FastOS_Runnable +class TestClock { private: int _timePeriodMS; @@ -44,8 +44,9 @@ private: std::condition_variable _cond; UpdateClock &_clock; bool _stop; + std::thread _thread; - void Run(FastOS_ThreadInterface *thisThread, void *arguments) override; + void run(); public: TestClock(UpdateClock & clock, double timePeriod) @@ -53,74 +54,68 @@ public: _lock(), _cond(), _clock(clock), - _stop(false) - { } + _stop(false), + _thread() + { + _thread = std::thread([this](){run();}); + } ~TestClock() { - std::lock_guard guard(_lock); - _stop = true; - _cond.notify_all(); + { + std::lock_guard guard(_lock); + _stop = true; + _cond.notify_all(); + } + _thread.join(); } }; -void TestClock::Run(FastOS_ThreadInterface *thread, void *) +void TestClock::run() { std::unique_lock guard(_lock); - while ( ! thread->GetBreakFlag() && !_stop) { + while (!_stop) { _clock.update(); _cond.wait_for(guard, std::chrono::milliseconds(_timePeriodMS)); } } -struct SamplerBase : public FastOS_Runnable { - SamplerBase(uint32_t threadId) - : _thread(nullptr), - _threadId(threadId), - _samples(0), - _count() +template +struct Sampler { + Sampler(Func func, uint64_t samples) + : _samples(samples), + _count(), + _func(func), + _thread() { memset(_count, 0, sizeof(_count)); + _thread = std::thread([this](){run();}); } - FastOS_ThreadInterface * _thread; - uint32_t _threadId; - uint64_t _samples; - uint64_t _count[3]; -}; - -template -struct Sampler : public SamplerBase { - Sampler(Func func, uint32_t threadId) - : SamplerBase(threadId), - _func(func) - { } - void Run(FastOS_ThreadInterface *, void *) override { - uint64_t samples; + void run() { steady_time prev = _func(); - for (samples = 0; (samples < _samples); samples++) { + for (uint64_t samples = 0; samples < _samples; ++samples) { steady_time now = _func(); duration diff = now - prev; if (diff > duration::zero()) prev = now; _count[1 + ((diff == duration::zero()) ? 0 : (diff > duration::zero()) ? 1 : -1)]++; } - } - Func _func; + uint64_t _samples; + uint64_t _count[3]; + Func _func; + std::thread _thread; }; template -void benchmark(const char * desc, FastOS_ThreadPool & pool, uint64_t samples, uint32_t numThreads, Func func) { - std::vector> threads; +void benchmark(const char * desc, uint64_t samples, uint32_t numThreads, Func func) { + std::vector>> threads; threads.reserve(numThreads); steady_time start = steady_clock::now(); for (uint32_t i(0); i < numThreads; i++) { - SamplerBase * sampler = new Sampler(func, i); - sampler->_samples = samples; - sampler->_thread = pool.NewThread(sampler, nullptr); - threads.emplace_back(sampler); + threads.push_back(std::make_unique>(func, samples)); } uint64_t count[3]; memset(count, 0, sizeof(count)); for (const auto & sampler : threads) { - sampler->_thread->Join(); + sampler->_thread.join(); for (uint32_t i(0); i < 3; i++) { count[i] += sampler->_count[i]; } @@ -129,12 +124,15 @@ void benchmark(const char * desc, FastOS_ThreadPool & pool, uint64_t samples, ui } int -main(int , char *argv[]) +main(int argc, char *argv[]) { + if (argc != 4) { + fprintf(stderr, "usage: %s \n", argv[0]); + return 1; + } uint64_t frequency = atoll(argv[1]); uint32_t numThreads = atoi(argv[2]); uint64_t samples = atoll(argv[3]); - FastOS_ThreadPool pool; NSValue nsValue; NSVolatile nsVolatile; NSAtomic nsAtomic; @@ -143,36 +141,30 @@ main(int , char *argv[]) TestClock nsClock(nsValue, 1.0/frequency); TestClock nsVolatileClock(nsVolatile, 1.0/frequency); TestClock nsAtomicClock(nsAtomic, 1.0/frequency); - assert(pool.NewThread(&nsClock, nullptr) != nullptr); - assert(pool.NewThread(&nsVolatileClock, nullptr) != nullptr); - assert(pool.NewThread(&nsAtomicClock, nullptr) != nullptr); - benchmark("vespalib::Clock", pool, samples, numThreads, [&clock]() { + benchmark("vespalib::Clock", samples, numThreads, [&clock]() { return clock.getTimeNS(); }); - benchmark("uint64_t", pool, samples, numThreads, [&nsValue]() { + benchmark("uint64_t", samples, numThreads, [&nsValue]() { return steady_time (duration(nsValue._value)); }); - benchmark("volatile uint64_t", pool, samples, numThreads, [&nsVolatile]() { + benchmark("volatile uint64_t", samples, numThreads, [&nsVolatile]() { return steady_time(duration(nsVolatile._value)); }); - benchmark("memory_order_relaxed", pool, samples, numThreads, [&nsAtomic]() { + benchmark("memory_order_relaxed", samples, numThreads, [&nsAtomic]() { return steady_time(duration(nsAtomic._value.load(std::memory_order_relaxed))); }); - benchmark("memory_order_consume", pool, samples, numThreads, [&nsAtomic]() { + benchmark("memory_order_consume", samples, numThreads, [&nsAtomic]() { return steady_time(duration(nsAtomic._value.load(std::memory_order_consume))); }); - benchmark("memory_order_acquire", pool, samples, numThreads, [&nsAtomic]() { + benchmark("memory_order_acquire", samples, numThreads, [&nsAtomic]() { return steady_time(duration(nsAtomic._value.load(std::memory_order_acquire))); }); - benchmark("memory_order_seq_cst", pool, samples, numThreads, [&nsAtomic]() { + benchmark("memory_order_seq_cst", samples, numThreads, [&nsAtomic]() { return steady_time(duration(nsAtomic._value.load(std::memory_order_seq_cst))); }); - - benchmark("vespalib::steady_time::now()", pool, samples, numThreads, []() { + benchmark("vespalib::steady_time::now()", samples, numThreads, []() { return steady_clock::now(); }); - - pool.Close(); return 0; } diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index d919b8f2ab7..f331f1870c7 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -32,6 +32,7 @@ public: _threads.reserve(_threads.size() + 1); _threads.emplace_back(std::forward(f), std::forward(args)...); }; + size_t size() const { return _threads.size(); } void join() { for (auto &thread: _threads) { thread.join(); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index 8b6427d9391..8af14366293 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -1,29 +1,9 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "threadstackexecutorbase.h" -#include namespace vespalib { -namespace thread { - -struct ThreadInit : public FastOS_Runnable { - Runnable &worker; - ThreadStackExecutorBase::init_fun_t init_fun; - - explicit ThreadInit(Runnable &worker_in, ThreadStackExecutorBase::init_fun_t init_fun_in) - : worker(worker_in), init_fun(std::move(init_fun_in)) {} - - void Run(FastOS_ThreadInterface *, void *) override; -}; - -void -ThreadInit::Run(FastOS_ThreadInterface *, void *) { - init_fun(worker); -} - -} - ThreadStackExecutorBase::Worker::Worker() : lock(), cond(), @@ -155,7 +135,7 @@ ThreadStackExecutorBase::run() ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t taskLimit, init_fun_t init_fun) : SyncableThreadExecutor(), Runnable(), - _pool(std::make_unique()), + _pool(), _lock(), _cond(), _stats(), @@ -167,7 +147,7 @@ ThreadStackExecutorBase::ThreadStackExecutorBase(uint32_t taskLimit, init_fun_t _taskCount(0), _taskLimit(taskLimit), _closed(false), - _thread_init(std::make_unique(*this, std::move(init_fun))) + _init_fun(init_fun) { assert(taskLimit > 0); } @@ -177,15 +157,13 @@ ThreadStackExecutorBase::start(uint32_t threads) { assert(threads > 0); for (uint32_t i = 0; i < threads; ++i) { - FastOS_ThreadInterface *thread = _pool->NewThread(_thread_init.get()); - assert(thread != nullptr); - (void)thread; + _pool.start(*this, _init_fun); } } size_t ThreadStackExecutorBase::getNumThreads() const { - return _pool->GetNumStartedThreads(); + return _pool.size(); } void @@ -315,12 +293,11 @@ ThreadStackExecutorBase::cleanup() { shutdown().sync(); _executorCompletion.countDown(); - _pool->Close(); + _pool.join(); } ThreadStackExecutorBase::~ThreadStackExecutorBase() { - assert(_pool->isClosed()); assert(_taskCount == 0); assert(_blocked.empty()); } diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 501fde92f4c..765499b73bc 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -2,21 +2,17 @@ #pragma once +#include "thread.h" #include "threadexecutor.h" #include "eventbarrier.hpp" #include "arrayqueue.hpp" #include "gate.h" -#include "runnable.h" #include "executor_idle_tracking.h" #include #include -class FastOS_ThreadPool; - namespace vespalib { -namespace thread { struct ThreadInit; } - /** * An executor service that executes tasks in multiple threads. **/ @@ -73,7 +69,7 @@ private: void unblock(); }; - std::unique_ptr _pool; + ThreadPool _pool; mutable std::mutex _lock; std::condition_variable _cond; ExecutorStats _stats; @@ -86,7 +82,7 @@ private: uint32_t _taskCount; uint32_t _taskLimit; bool _closed; - std::unique_ptr _thread_init; + init_fun_t _init_fun; static thread_local ThreadStackExecutorBase *_master; void block_thread(const unique_lock &, BlockedThread &blocked_thread); @@ -225,4 +221,3 @@ public: }; } // namespace vespalib - -- cgit v1.2.3